diff --git a/src/apps/ipfix/ipfix.lua b/src/apps/ipfix/ipfix.lua index 607fa9ba8c..67c20d04b5 100644 --- a/src/apps/ipfix/ipfix.lua +++ b/src/apps/ipfix/ipfix.lua @@ -24,7 +24,7 @@ local ctable = require("lib.ctable") local C = ffi.C local htonl, htons = lib.htonl, lib.htons -local metadata_add = metadata.add +local metadata_add, metadata_get = metadata.add, metadata.get local debug = lib.getenv("FLOW_EXPORT_DEBUG") @@ -206,7 +206,8 @@ function FlowSet:new (spec, args) o.incoming_link_name, o.incoming = new_internal_link('IPFIX incoming') -- Generic per-template counters - o.shm = shm.create_frame("templates/"..template.id, + local shm_name = "ipfix_templates/"..args.instance.."/"..template.id + o.shm = shm.create_frame(shm_name, { packets_in = { counter }, flow_export_packets = { counter }, exported_flows = { counter }, @@ -222,7 +223,7 @@ function FlowSet:new (spec, args) conf[name] = { counter, 0 } end o.shm_template = - shm.create_frame("templates/"..template.id.."/stats", conf) + shm.create_frame(shm_name.."/stats", conf) end return setmetatable(o, { __index = self }) end @@ -390,7 +391,11 @@ local ipfix_config_params = { collector_port = { required = true }, templates = { default = { "v4", "v6" } }, maps = { default = {} }, - maps_log_fh = { default = nil } + maps_log_fh = { default = nil }, + -- Used to distinguish instances of the app running in the same + -- process + instance = { default = 1 }, + add_packet_metadata = { default = true } } local function setup_transport_header(self, config) @@ -428,7 +433,9 @@ function IPFIX:new(config) template_refresh_interval = config.template_refresh_interval, next_template_refresh = -1, version = config.ipfix_version, - observation_domain = config.observation_domain } + observation_domain = config.observation_domain, + instance = config.instance, + add_packet_metadata = config.add_packet_metadata } o.shm = { -- Total number of packets received received_packets = { counter }, @@ -469,7 +476,9 @@ function IPFIX:new(config) flush_timeout = config.flush_timeout, parent = o, maps = config.maps, - maps_log_fh = config.maps_log_fh } + maps_log_fh = config.maps_log_fh, + instance = config.instance } + o.flow_sets = {} for _, template in ipairs(config.templates) do table.insert(o.flow_sets, FlowSet:new(template, flow_set_args)) @@ -556,10 +565,19 @@ function IPFIX:push() local flow_sets = self.flow_sets local nreadable = link.nreadable(input) counter.add(self.shm.received_packets, nreadable) + + if self.add_packet_metadata then + for _ = 1, nreadable do + local p = link.receive(input) + metadata_add(p) + link.transmit(input, p) + end + end + for _,set in ipairs(flow_sets) do for _ = 1, nreadable do local p = link.receive(input) - local md = metadata_add(p) + local md = metadata_get(p) if set.match(md.filter_start, md.filter_length) then link.transmit(set.incoming, p) else diff --git a/src/apps/ipfix/template.lua b/src/apps/ipfix/template.lua index bff8173056..e0a5c6a71f 100644 --- a/src/apps/ipfix/template.lua +++ b/src/apps/ipfix/template.lua @@ -14,12 +14,14 @@ local ipv4 = require("lib.protocol.ipv4") local metadata = require("apps.rss.metadata") local strings = require("apps.ipfix.strings") local dns = require("apps.ipfix.dns") +local S = require("syscall") local ntohs = lib.ntohs local htonl, htons = lib.htonl, lib.htons local function htonq(v) return bit.bswap(v + 0ULL) end local metadata_get = metadata.get local ether_header_ptr_t = metadata.ether_header_ptr_t +local log_pid = string.format("[%5d]", S.getpid()) local function ptr_to(ctype) return ffi.typeof('$*', ctype) end @@ -212,7 +214,8 @@ function make_template_info(spec) record_ptr_t = ptr_to(record_t), swap_fn = gen_swap_fn(), match = pf.compile_filter(spec.filter), - logger = lib.logger_new({ module = "IPFIX template #"..spec.id }), + logger = lib.logger_new({module = log_pid + .." IPFIX template #"..spec.id }), counters = spec.counters, counters_names = counters_names, extract = spec.extract, diff --git a/src/program/ipfix/lib.lua b/src/program/ipfix/lib.lua new file mode 100644 index 0000000000..03a5f63e14 --- /dev/null +++ b/src/program/ipfix/lib.lua @@ -0,0 +1,274 @@ +module(..., package.seeall) + +local now = require("core.app").now +local lib = require("core.lib") +local shm = require("core.shm") +local counter = require("core.counter") +local app_graph = require("core.config") +local link = require("core.link") +local pci = require("lib.hardware.pci") +local numa = require("lib.numa") +local ipv4 = require("lib.protocol.ipv4") +local ethernet = require("lib.protocol.ethernet") +local macaddress = require("lib.macaddress") +local S = require("syscall") +local interlink = require("lib.interlink") +local basic = require("apps.basic.basic_apps") +local arp = require("apps.ipv4.arp") +local ipfix = require("apps.ipfix.ipfix") +local template = require("apps.ipfix.template") + +-- apps that can be used as an input or output for the exporter +local in_apps, out_apps = {}, {} + +local function parse_spec (spec, delimiter) + local t = {} + for s in spec:split(delimiter or ':') do + table.insert(t, s) + end + return t +end + +function in_apps.pcap (path) + return { input = "input", + output = "output" }, + { require("apps.pcap.pcap").PcapReader, path } +end + +function out_apps.pcap (path) + return { input = "input", + output = "output" }, + { require("apps.pcap.pcap").PcapWriter, path } +end + +function out_apps.tap_routed (device) + return { input = "input", + output = "output" }, + { require("apps.tap.tap").Tap, { name = device } } +end + +function in_apps.raw (device) + return { input = "rx", + output = "tx" }, + { require("apps.socket.raw").RawSocket, device } +end +out_apps.raw = in_apps.raw + +function in_apps.tap (device) + return { input = "input", + output = "output" }, + { require("apps.tap.tap").Tap, device } +end +out_apps.tap = in_apps.tap + +function in_apps.interlink (name) + return { input = nil, + output = "output" }, + { require("apps.interlink.receiver"), { name = name, create = true } } +end + +function in_apps.pci (spec) + local device, rxq = unpack(parse_spec(spec, '/')) + local device_info = pci.device_info(device) + local conf = { pciaddr = device } + if device_info.driver == 'apps.intel_mp.intel_mp' then + local rxq = (rxq and tonumber(rxq)) or 0 + conf.rxq = rxq + conf.rxcounter = rxq + conf.ring_buffer_size = 32768 + end + return { input = device_info.rx, output = device_info.tx }, + { require(device_info.driver).driver, conf } +end +out_apps.pci = in_apps.pci + +probe_config = { + -- Probe-specific + output_type = {required = true}, + output = { required = true }, + input_type = { default = nil }, + input = { default = nil }, + exporter_mac = { default = nil }, + -- Passed on to IPFIX app + active_timeout = { default = nil }, + idle_timeout = { default = nil }, + flush_timeout = { default = nil }, + cache_size = { default = nil }, + scan_time = { default = nil }, + observation_domain = { default = nil }, + template_refresh_interval = { default = nil }, + ipfix_version = { default = nil }, + exporter_ip = { required = true }, + collector_ip = { required = true }, + collector_port = { required = true }, + mtu = { default = nil }, + templates = { required = true }, + maps = { default = {} }, + maps_logfile = { default = nil }, + instance = { default = 1 }, + add_packet_metadata = { default = true } +} + +function configure_graph (arg, in_graph) + local config = lib.parse(arg, probe_config) + + local in_link, in_app + if config.input_type then + assert(in_apps[config.input_type], + "unknown input type: "..config.input_type) + assert(config.input, "Missing input parameter") + in_link, in_app = in_apps[config.input_type](config.input) + end + assert(out_apps[config.output_type], + "unknown output type: "..config.output_type) + local out_link, out_app = out_apps[config.output_type](config.output) + + if config.output_type == "tap_routed" then + local tap_config = out_app[2] + tap_config.mtu = config.mtu + end + + local function mk_ipfix_config() + return { active_timeout = config.active_timeout, + idle_timeout = config.idle_timeout, + flush_timeout = config.flush_timeout, + cache_size = config.cache_size, + scan_time = config.scan_time, + observation_domain = config.observation_domain, + template_refresh_interval = + config.template_refresh_interval, + ipfix_version = config.ipfix_version, + exporter_ip = config.exporter_ip, + collector_ip = config.collector_ip, + collector_port = config.collector_port, + mtu = config.mtu - 14, + templates = config.templates, + maps = config.maps, + maps_log_fh = assert(io.open(config.maps_logfile, "a")), + instance = config.instance, + add_packet_metadata = config.add_packet_metadata } + end + + local ipfix_config = mk_ipfix_config() + local ipfix_name = "ipfix"..config.instance + local out_name = "out"..config.instance + local sink_name = "sink"..config.instance + + local graph = in_graph or app_graph.new() + if config.input then + app_graph.app(graph, "in", unpack(in_app)) + app_graph.link(graph, "in." .. in_link.output .. " -> " + ..ipfix_name..".input") + end + app_graph.app(graph, ipfix_name, ipfix.IPFIX, ipfix_config) + app_graph.app(graph, out_name, unpack(out_app)) + + -- use ARP for link-layer concerns unless the output is connected + -- to a pcap writer or a routed tap interface + if (config.output_type ~= "pcap" and + config.output_type ~= "tap_routed") then + local arp_name = "arp"..config.instance + local arp_config = { self_mac = config.exporter_mac and + ethernet:pton(config.exporter_mac), + self_ip = ipv4:pton(config.exporter_ip), + next_ip = ipv4:pton(config.collector_ip) } + app_graph.app(graph, arp_name, arp.ARP, arp_config) + app_graph.app(graph, sink_name, basic.Sink) + + app_graph.link(graph, out_name.."."..out_link.output.." -> " + ..arp_name..".south") + + -- with UDP, ipfix doesn't need to handle packets from the collector + app_graph.link(graph, arp_name..".north -> "..sink_name..".input") + + app_graph.link(graph, ipfix_name..".output -> "..arp_name..".north") + app_graph.link(graph, arp_name..".south -> " + ..out_name.."."..out_link.input) + else + app_graph.link(graph, ipfix_name..".output -> " + ..out_name.."."..out_link.input) + app_graph.app(graph, sink_name, basic.Sink) + app_graph.link(graph, out_name.."."..out_link.output.." -> " + ..sink_name..".input") + end + + engine.configure(graph) + + if config.output_type == "tap_routed" then + local tap_config = out_app[2] + local name = tap_config.name + local tap_sysctl_base = "net/ipv4/conf/"..name + assert(S.sysctl(tap_sysctl_base.."/rp_filter", '0')) + assert(S.sysctl(tap_sysctl_base.."/accept_local", '1')) + assert(S.sysctl(tap_sysctl_base.."/forwarding", '1')) + local out_stats = engine.app_table[out_name].shm + local ipfix_config = mk_ipfix_config() + ipfix_config.exporter_eth_dst = + tostring(macaddress:new(counter.read(out_stats.macaddr))) + app_graph.app(graph, ipfix_name, ipfix.IPFIX, ipfix_config) + engine.configure(graph) + end + + return config, graph +end + +function run (arg, duration, busywait, cpu, jit) + local config = configure_graph(arg) + + if jit then + if jit.v then + require("jit.v").start(jit.v) + end + if jit.p and #jit.p > 0 then + require("jit.p").start(unpack(jit.p)) + end + if jit.traceprof then + require("lib.traceprof.traceprof").start() + end + if jit.dump and #jit.dump > 0 then + require("jit.dump").on(unpack(jit.dump)) + end + if jit.opts and #jit.opts > 0 then + require("jit.opt").start(unpack(jit.opts)) + end + end + + local done + if not duration and config.input_type == "pcap" then + done = function () + return engine.app_table['in'].done + end + end + + local t1 = now() + if cpu then numa.bind_to_cpu(cpu) end + + engine.busywait = busywait + engine.main({ duration = duration, done = done, measure_latency = false }) + + if jit then + if jit.dump then + require("jit.dump").off() + end + if jit.traceprof then + require("lib.traceprof.traceprof").stop() + end + if jit.p then + require("jit.p").stop() + end + if jit.v then + require("jit.v").stop() + end + end + + local t2 = now() + local stats = link.stats(engine.app_table.ipfix.input.input) + print("IPFIX probe stats:") + local comma = lib.comma_value + print(string.format("bytes: %s packets: %s bps: %s Mpps: %s", + comma(stats.rxbytes), + comma(stats.rxpackets), + comma(math.floor((stats.rxbytes * 8) / (t2 - t1))), + comma(stats.rxpackets / ((t2 - t1) * 1000000)))) + +end diff --git a/src/program/ipfix/probe/probe.lua b/src/program/ipfix/probe/probe.lua index a2af992224..c6a8e8d768 100644 --- a/src/program/ipfix/probe/probe.lua +++ b/src/program/ipfix/probe/probe.lua @@ -2,78 +2,7 @@ module(..., package.seeall) -local now = require("core.app").now -local lib = require("core.lib") -local link = require("core.link") -local shm = require("core.shm") -local counter = require("core.counter") -local basic = require("apps.basic.basic_apps") -local arp = require("apps.ipv4.arp") -local ipfix = require("apps.ipfix.ipfix") -local pci = require("lib.hardware.pci") -local ipv4 = require("lib.protocol.ipv4") -local ethernet = require("lib.protocol.ethernet") -local macaddress = require("lib.macaddress") -local numa = require("lib.numa") -local S = require("syscall") - --- apps that can be used as an input or output for the exporter -local in_apps, out_apps = {}, {} - -local function parse_spec (spec, delimiter) - local t = {} - for s in spec:split(delimiter or ':') do - table.insert(t, s) - end - return t -end - -function in_apps.pcap (path) - return { input = "input", - output = "output" }, - { require("apps.pcap.pcap").PcapReader, path } -end - -function out_apps.pcap (path) - return { input = "input", - output = "output" }, - { require("apps.pcap.pcap").PcapWriter, path } -end - -function out_apps.tap_routed (device) - return { input = "input", - output = "output" }, - { require("apps.tap.tap").Tap, { name = device } } -end - -function in_apps.raw (device) - return { input = "rx", - output = "tx" }, - { require("apps.socket.raw").RawSocket, device } -end -out_apps.raw = in_apps.raw - -function in_apps.tap (device) - return { input = "input", - output = "output" }, - { require("apps.tap.tap").Tap, device } -end -out_apps.tap = in_apps.tap - -function in_apps.pci (spec) - local device, rxq = unpack(parse_spec(spec, '/')) - local device_info = pci.device_info(device) - local conf = { pciaddr = device } - if device_info.driver == 'apps.intel_mp.intel_mp' then - local rxq = (rxq and tonumber(rxq)) or 0 - conf.rxq = rxq - conf.rxcounter = rxq - conf.ring_buffer_size = 32768 - end - return { input = device_info.rx, output = device_info.tx }, - { require(device_info.driver).driver, conf } -end -out_apps.pci = in_apps.pci +local lib = require("core.lib") local long_opts = { help = "h", @@ -105,8 +34,7 @@ local long_opts = { function run (args) local duration local busywait = false - local traceprofiling = false - local jit_opts = {} + local jit = { opts = {} } local input_type, output_type = "pci", "pci" @@ -122,7 +50,7 @@ function run (args) local ipfix_version = 10 local templates = {} local maps = {} - local maps_log_fh + local maps_logfile local pfx_to_as, vlan_to_ifindex, mac_to_as @@ -138,11 +66,9 @@ function run (args) duration = assert(tonumber(arg), "expected number for duration") end, i = function (arg) - assert(in_apps[arg], "unknown input type") input_type = arg end, o = function (arg) - assert(out_apps[arg], "unknown output type") output_type = arg end, p = function (arg) @@ -205,7 +131,7 @@ function run (args) end, ["maps-log"] = function (arg) if arg then - maps_log_fh = assert(io.open(arg, "a")) + maps_logfile = arg end end, ipfix = function (arg) @@ -226,22 +152,20 @@ function run (args) if arg:match("^v") then local file = arg:match("^v=(.*)") if file == '' then file = nil end - require("jit.v").start(file) + jit.v = file elseif arg:match("^p") then local opts, file = arg:match("^p=([^,]*),?(.*)") if file == '' then file = nil end - require("jit.p").start(opts, file) - profiling = true + jit.p = { opts, file } elseif arg:match("^dump") then local opts, file = arg:match("^dump=([^,]*),?(.*)") if file == '' then file = nil end - require("jit.dump").on(opts, file) + jit.dump = { opts, file } elseif arg:match("^opt") then local opt = arg:match("^opt=(.*)") - table.insert(jit_opts, opt) + table.insert(jit.opts, opt) elseif arg:match("^tprof") then - require("lib.traceprof.traceprof").start() - traceprofiling = true + jit.traceprof = true end end } @@ -255,107 +179,34 @@ function run (args) table.insert(args, 'v6') end - local in_link, in_app = in_apps[input_type](args[1]) - local out_link, out_app = out_apps[output_type](args[2]) + local input, output = args[1], args[2] for i = 3, #args do table.insert(templates, args[i]) end - local arp_config = { self_mac = host_mac and ethernet:pton(host_mac), - self_ip = ipv4:pton(host_ip), - next_ip = ipv4:pton(collector_ip) } - local function mk_ipfix_config() - return { active_timeout = active_timeout, - idle_timeout = idle_timeout, - flush_timeout = flush_timeout, - cache_size = cache_size, - scan_time = scan_time, - observation_domain = observation_domain, - template_refresh_interval = template_refresh_interval, - ipfix_version = ipfix_version, - exporter_ip = host_ip, - collector_ip = collector_ip, - collector_port = port, - mtu = mtu - 14, - templates = templates, - maps = maps, - maps_log_fh = maps_log_fh } - end - local ipfix_config = mk_ipfix_config() - if output_type == "tap_routed" then - tap_config = out_app[2] - tap_config.mtu = mtu - end - local c = config.new() - - config.app(c, "in", unpack(in_app)) - config.app(c, "ipfix", ipfix.IPFIX, ipfix_config) - config.app(c, "out", unpack(out_app)) - - -- use ARP for link-layer concerns unless the output is connected - -- to a pcap writer - if output_type ~= "pcap" and output_type ~= "tap_routed" then - config.app(c, "arp", arp.ARP, arp_config) - config.app(c, "sink", basic.Sink) - - config.link(c, "in." .. in_link.output .. " -> ipfix.input") - config.link(c, "out." .. out_link.output .. " -> arp.south") - - -- with UDP, ipfix doesn't need to handle packets from the collector - config.link(c, "arp.north -> sink.input") - - config.link(c, "ipfix.output -> arp.north") - config.link(c, "arp.south -> out." .. out_link.input) - else - config.link(c, "in." .. in_link.output .. " -> ipfix.input") - config.link(c, "ipfix.output -> out." .. out_link.input) - config.app(c, "sink", basic.Sink) - config.link(c, "out." .. out_link.output .. " -> sink.input") - end - - local done - if not duration and input_type == "pcap" then - done = function () - return engine.app_table['in'].done - end - end - - local t1 = now() - if cpu then numa.bind_to_cpu(cpu) end - - engine.configure(c) - - if output_type == "tap_routed" then - local tap_config = out_app[2] - local name = tap_config.name - local tap_sysctl_base = "net/ipv4/conf/"..name - assert(S.sysctl(tap_sysctl_base.."/rp_filter", '0')) - assert(S.sysctl(tap_sysctl_base.."/accept_local", '1')) - assert(S.sysctl(tap_sysctl_base.."/forwarding", '1')) - local export_stats = engine.app_table.out.shm - local ipfix_config = mk_ipfix_config() - ipfix_config.exporter_eth_dst = - tostring(macaddress:new(counter.read(export_stats.macaddr))) - config.app(c, "ipfix", ipfix.IPFIX, ipfix_config) - engine.configure(c) - end - engine.busywait = busywait - if #jit_opts then - require("jit.opt").start(unpack(jit_opts)) - end - engine.main({ duration = duration, done = done, measure_latency = false }) - if traceprofiling then - require("lib.traceprof.traceprof").stop() - end + local probe_config = { + active_timeout = active_timeout, + idle_timeout = idle_timeout, + flush_timeout = flush_timeout, + cache_size = cache_size, + scan_time = scan_time, + observation_domain = observation_domain, + template_refresh_interval = template_refresh_interval, + ipfix_version = ipfix_version, + exporter_ip = host_ip, + exporter_mac = host_mac, + collector_ip = collector_ip, + collector_port = port, + mtu = mtu, + templates = templates, + maps = maps, + maps_logfile = maps_logfile, + output_type = output_type, + output = output, + input_type = input_type, + input = input + } - local t2 = now() - local stats = link.stats(engine.app_table.ipfix.input.input) - print("IPFIX probe stats:") - local comma = lib.comma_value - print(string.format("bytes: %s packets: %s bps: %s Mpps: %s", - comma(stats.rxbytes), - comma(stats.rxpackets), - comma(math.floor((stats.rxbytes * 8) / (t2 - t1))), - comma(stats.rxpackets / ((t2 - t1) * 1000000)))) + require("program.ipfix.lib").run(probe_config, duration, busywait, cpu, jit) end diff --git a/src/program/ipfix/stats/stats.lua b/src/program/ipfix/stats/stats.lua index 993f09b450..e4993e3e64 100644 --- a/src/program/ipfix/stats/stats.lua +++ b/src/program/ipfix/stats/stats.lua @@ -14,10 +14,49 @@ local function read(c, cv) return cv and comma_value(n) or n end -local function format(format, ...) +local function format(indent, format, ...) + for i = 1, indent do + io.stdout:write(" ") + end print(string.format(format, ...)) end +local function pci_stats(indent, path) + local indent = indent or 0 + for _, pciaddr in ipairs(shm.children(path.."/pci")) do + local pci = shm.open_frame(path.."/pci/"..pciaddr) + local rxdrops_total = 0 + print() + format(indent, "NIC %s", pciaddr) + local qids = {} + for _, file in ipairs(shm.children(path.."/pci/"..pciaddr)) do + local base = file:match('(q[%d]+_rx_enabled)') + if base then + if read(pci[base]) == 1 then + local i = tonumber(file:match('q([%d]+)_')) + table.insert(qids, i) + end + end + end + table.sort(qids) + for _, i in ipairs(qids) do + local name = "q"..i.."_" + local rxdrops = read(pci[name..'rxdrops']) + format(indent+2, "Queue #"..i) + format(indent+4, "rxpackets %s", read(pci[name..'rxpackets'], true)) + format(indent+4, "rxdrops %s", comma_value(rxdrops)) + rxdrops_total = rxdrops_total + rxdrops + end + local rxpackets = read(pci.rxpackets) + format(indent+2, "Total") + format(indent+4, "rxpackets %s", comma_value(rxpackets)) + format(indent+4, "rxdrops %s (%1.4F%%)", + comma_value(rxdrops_total), + 100*rxdrops_total/rxpackets) + shm.delete_frame(pci) + end +end + local long_opts = { help = "h", } @@ -35,15 +74,42 @@ function run (args) args = lib.dogetopt(args, opt, "hn", long_opts) local pids = {} + local rss = {} if #args == 0 then for _, pid in ipairs(shm.children('/')) do if tonumber(pid) then for _, dir in ipairs(shm.children('/'..pid)) do - if dir == 'templates' then + if dir == 'ipfix_templates' then table.insert(pids, tonumber(pid)) break end end + for _, dir in ipairs(shm.children('/'..pid..'/apps')) do + if dir == 'rss' then + local receivers = {} + for _, link in ipairs(shm.children('/'..pid..'/links')) do + repeat + if not link:match("^rss%..* -> +ipfix") then break end + local receiver = link:match(".* -> +([%w_]+)%.") + local id, type + if receiver:match("^ipfixmp") then + -- Receiver is connected through a + -- multi-process link + id = receiver:match("ipfixmp(%d+)") + type = 'pid' + else + -- Receiver is running in the same process + id = receiver:match("ipfix(%d+)") + type = 'instance' + end + table.insert(receivers, + { id = id, type = type, link = link }) + until true + end + table.insert(rss, { pid = tonumber(pid), + receivers = receivers }) + end + end end end else @@ -56,81 +122,96 @@ function run (args) end table.sort(pids) + table.sort(rss, function (a, b) return a.pid < b.pid end) + + for _, rss in ipairs(rss) do + print() + format(0, "RSS process #%d", rss.pid) + pci_stats(2, '/'..rss.pid) + print() + format(2, "IPFIX receivers") + table.sort(rss.receivers, function (a, b) return a.id < b.id end) + for _, rcv in ipairs(rss.receivers) do + if rcv.type == 'pid' then + format(4, "Process #%d", rcv.id) + else + format(4, "Embedded instance #%d", rcv.id) + end + local stats = shm.open_frame('/'..rss.pid..'/links/'..rcv.link) + local txpackets = read(stats.txpackets) + local txdrop = read(stats.txdrop) + format(6, "txpackets %s", comma_value(txpackets)) + format(6, "txdrop %s (%1.4F%%)", + comma_value(txdrop), + 100*txdrop/txpackets) + shm.delete_frame(stats) + end + end for _, pid in ipairs(pids) do - format("\nIPFIX process #%d", pid) + print() + format(0, "IPFIX process #%d", pid) local base_path = "/"..pid - local ipfix = shm.open_frame(base_path.."/apps/ipfix") - format("Version %d", read(ipfix.version)) - format("Observation domain %d", read(ipfix.observation_domain)) - format("Received packets %s", read(ipfix.received_packets, true)) - format("Ignored packets %s", read(ipfix.ignored_packets, true)) - format("Template packets %s", read(ipfix.template_packets, true)) - format("Sequence number %s", read(ipfix.sequence_number, true)) - shm.delete_frame(ipfix) - for _, pciaddr in ipairs(shm.children(base_path.."/pci")) do - local pci = shm.open_frame(base_path.."/pci/"..pciaddr) - local rxdrops_total = 0 - format("NIC %s", pciaddr) - local qids = {} - for _, file in ipairs(shm.children(base_path.."/pci/"..pciaddr)) do - local base = file:match('(q[%d]+_rx_enabled)') - if base then - if read(pci[base]) == 1 then - local i = tonumber(file:match('q([%d]+)_')) - table.insert(qids, i) - end - end - end - table.sort(qids) - for _, i in ipairs(qids) do - local name = "q"..i.."_" - local rxdrops = read(pci[name..'rxdrops']) - format(" Queue #"..i) - format(" rxpackets %s", read(pci[name..'rxpackets'], true)) - format(" rxdrops %s", comma_value(rxdrops)) - rxdrops_total = rxdrops_total + rxdrops - end - local rxpackets = read(pci.rxpackets) - format(" Total") - format(" rxpackets %s", comma_value(rxpackets)) - format(" rxdrops %s (%1.4F%%)", - comma_value(rxdrops_total), - 100*rxdrops_total/rxpackets) - shm.delete_frame(pci) + if #rss == 0 then + pci_stats(2, base_path) end - local templates = {} - for _, id in ipairs(shm.children(base_path.."/templates")) do - table.insert(templates, tonumber(id)) + local instances = {} + for _, app in ipairs(shm.children(base_path.."/apps")) do + local instance = app:match("^ipfix(%d)$") + if instance then + table.insert(instances, tonumber(instance)) + end end - table.sort(templates) - for _, id in ipairs(templates) do - format("\nTemplate #%d", id) - local template = shm.open_frame(base_path.."/templates/"..id) - format("Processed packets %s", read(template.packets_in, true)) - format("Exported flows %s", read(template.exported_flows, true)) - format("Flow export packets %s", - read(template.flow_export_packets, true)) - local size = read(template.table_size) - local occupancy = read(template.table_occupancy) - local max_disp = read(template.table_max_displacement) - format("Table stats") - format(" Occupancy %s", comma_value(occupancy)) - format(" Size %s", comma_value(size)) - format(" Byte size %s", read(template.table_byte_size, true)) - format(" Load-factor %1.2f", occupancy/size) - format(" Max displacement %d", max_disp) - format(" Last scan time %d", read(template.table_scan_time)) - shm.delete_frame(template) - if shm.exists(base_path.."/templates/"..id.."/stats") then - format("Template-specific stats") - local stats = shm.open_frame(base_path.."/templates/"..id.."/stats") - for name, _ in pairs(stats.specs) do - format(" %-25s %s", name, read(stats[name], true)) + table.sort(instances) + for _, instance in ipairs(instances) do + local ipfix = shm.open_frame(base_path.."/apps/ipfix"..instance) + print() + format(2,"Instance #"..instance) + format(4,"Version %d", read(ipfix.version)) + format(4,"Observation domain %d", read(ipfix.observation_domain)) + format(4,"Received packets %s", read(ipfix.received_packets, true)) + format(4,"Ignored packets %s", read(ipfix.ignored_packets, true)) + format(4,"Template packets %s", read(ipfix.template_packets, true)) + format(4, "Sequence number %s", read(ipfix.sequence_number, true)) + shm.delete_frame(ipfix) + + local templates = {} + local path = base_path.."/ipfix_templates/"..instance + for _, id in ipairs(shm.children(path)) do + table.insert(templates, tonumber(id)) + end + table.sort(templates) + for _, id in ipairs(templates) do + print() + format(4,"Template #%d", id) + local template = shm.open_frame(path.."/"..id) + format(6, "Processed packets %s", + read(template.packets_in, true)) + format(6, "Exported flows %s", + read(template.exported_flows, true)) + format(6, "Flow export packets %s", + read(template.flow_export_packets, true)) + local size = read(template.table_size) + local occupancy = read(template.table_occupancy) + local max_disp = read(template.table_max_displacement) + format(6,"Table stats") + format(8,"Occupancy %s", comma_value(occupancy)) + format(8,"Size %s", comma_value(size)) + format(8,"Byte size %s", read(template.table_byte_size, true)) + format(8,"Load-factor %1.2f", occupancy/size) + format(8,"Max displacement %d", max_disp) + format(8,"Last scan time %d", read(template.table_scan_time)) + shm.delete_frame(template) + if shm.exists(path.."/"..id.."/stats") then + format(6, "Template-specific stats") + local stats = shm.open_frame(path.."/"..id.."/stats") + for name, _ in pairs(stats.specs) do + format(8, "%-25s %s", name, read(stats[name], true)) + end + shm.delete_frame(stats) end - shm.delete_frame(stats) end end end