Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Join IPFIX meter and exporter apps
  • Loading branch information
wingo committed Jul 25, 2017
1 parent 4aa22c5 commit 318bac7
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 94 deletions.
152 changes: 67 additions & 85 deletions src/apps/ipfix/ipfix.lua
Expand Up @@ -437,10 +437,13 @@ function Exporter:flush_data_records(out)
link.transmit(out, pkt)
end

FlowExporter = {}
IPFIX = {}

function FlowExporter:new(config)
local o = { cache = assert(config.cache),
function IPFIX:new(config)
local o = { flows = assert(config.cache),
export_timer = nil,
idle_timeout = config.idle_timeout or 300,
active_timeout = config.active_timeout or 120,
-- sequence number to use for flow packets
sequence_number = 1,
boot_time = util.get_timestamp(),
Expand All @@ -460,6 +463,10 @@ function FlowExporter:new(config)
collector_ip = assert(config.collector_ip),
collector_port = assert(config.collector_port) }

-- Convert from secs to ms (internal timestamp granularity is ms).
o.idle_timeout = o.idle_timeout * 1000
o.active_timeout = o.active_timeout * 1000

if o.version == 9 then
o.header_t = netflow_v9_packet_header_t
elseif o.version == 10 then
Expand All @@ -484,15 +491,15 @@ function FlowExporter:new(config)
return setmetatable(o, { __index = self })
end

function FlowExporter:send_template_records(out)
function IPFIX:send_template_records(out)
local pkt = packet.allocate()
pkt = self.export_v4:append_template_record(pkt)
pkt = self.export_v6:append_template_record(pkt)
add_record_count(pkt, 2)
link.transmit(out, pkt)
end

function FlowExporter:add_ipfix_header(pkt, count)
function IPFIX:add_ipfix_header(pkt, count)
pkt = packet.shiftright(pkt, self.header_size)
local header = ffi.cast(self.header_ptr_t, pkt.data)

Expand All @@ -512,7 +519,7 @@ function FlowExporter:add_ipfix_header(pkt, count)
return pkt
end

function FlowExporter:add_transport_headers (pkt)
function IPFIX:add_transport_headers (pkt)
-- TODO: support IPv6, also obtain the MAC of the dst via ARP
-- and use the correct src MAC (this is ok for use on the
-- loopback device for now).
Expand All @@ -539,51 +546,6 @@ function FlowExporter:add_transport_headers (pkt)
return dgram:packet()
end

function FlowExporter:pull ()
assert(self.output.output, "missing output link")

local outgoing = self.outgoing_messages

if self.next_template_refresh < engine.now() then
self.next_template_refresh = engine.now() + template_interval
self:send_template_records(outgoing)
end

for _, record in ipairs(self.cache.v4:get_expired()) do
self.export_v4:add_data_record(record, outgoing)
end
self.export_v4:flush_data_records(outgoing)

for _, record in ipairs(self.cache.v6:get_expired()) do
self.export_v6:add_data_record(record, outgoing)
end
self.export_v6:flush_data_records(outgoing)

for i=1,link.nreadable(outgoing) do
local pkt = link.receive(outgoing)
pkt = self:add_ipfix_header(pkt, remove_record_count(pkt))
pkt = self:add_transport_headers(pkt)
link.transmit(self.output.output, pkt)
end
end

FlowMeter = {}

function FlowMeter:new(config)
local o = { flows = assert(config.cache),
export_timer = nil,
idle_timeout = config.idle_timeout or 300,
active_timeout = config.active_timeout or 120,
-- used for debugging mainly
boot_time = get_timestamp() }

-- convert from secs to ms (internal timestamp granularity is ms)
o.idle_timeout = o.idle_timeout * 1000
o.active_timeout = o.active_timeout * 1000

return setmetatable(o, { __index = self })
end

local function get_ethernet_n_ethertype(ptr)
return ffi.cast("uint16_t*", ptr + o_ethernet_ethertype)[0]
end
Expand Down Expand Up @@ -622,7 +584,7 @@ local function get_ipv6_traffic_class(ptr)
return bit.band(high, low)
end

function FlowMeter:process_flow(flows, flow_key, flow_record, l4_header)
function IPFIX:process_flow(flows, flow_key, flow_record, l4_header)
-- TCP, UDP, SCTP all have the ports in the same header location
if (flow_key.protocolIdentifier == IP_PROTO_TCP
or flow_key.protocolIdentifier == IP_PROTO_UDP
Expand Down Expand Up @@ -659,7 +621,7 @@ function FlowMeter:process_flow(flows, flow_key, flow_record, l4_header)
end
end

function FlowMeter:process_ipv6_packet(pkt, timestamp)
function IPFIX:process_ipv6_packet(pkt, timestamp)
local l2_header = pkt.data
-- We could warn here.
if get_ethernet_n_ethertype(l2_header) ~= n_ethertype_ipv6 then return end
Expand All @@ -680,10 +642,10 @@ function FlowMeter:process_ipv6_packet(pkt, timestamp)
-- TODO: handle chained headers
local l4_header = l3_header + ipv6_fixed_header_size

FlowMeter:process_flow(flows, flow_key, flow_record, l4_header)
IPFIX:process_flow(flows, flow_key, flow_record, l4_header)
end

function FlowMeter:process_ipv4_packet(pkt, timestamp)
function IPFIX:process_ipv4_packet(pkt, timestamp)
local l2_header = pkt.data
-- We could warn here.
if get_ethernet_n_ethertype(l2_header) ~= n_ethertype_ipv4 then return end
Expand All @@ -705,11 +667,11 @@ function FlowMeter:process_ipv4_packet(pkt, timestamp)
local ihl = get_ipv4_ihl(l3_header)
local l4_header = l3_header + ihl * 4

FlowMeter:process_flow(flows, flow_key, flow_record, l4_header)
IPFIX:process_flow(flows, flow_key, flow_record, l4_header)
end

-- print debugging messages for flow expiration
function FlowMeter:debug_expire(entry, timestamp, msg)
function IPFIX:debug_expire(entry, timestamp, msg)
local ipv4 = require("lib.protocol.ipv4")
local ipv6 = require("lib.protocol.ipv6")

Expand Down Expand Up @@ -745,7 +707,7 @@ end

-- Walk through flow cache to see if flow records need to be expired.
-- Collect expired records and export them to the collector.
function FlowMeter:expire_records()
function IPFIX:expire_records()
for _,proto in ipairs({'v4', 'v6'}) do
local timestamp = get_timestamp()
local keys_to_remove = {}
Expand Down Expand Up @@ -781,7 +743,7 @@ function FlowMeter:expire_records()
end
end

function FlowMeter:push()
function IPFIX:push()
local v4, v6 = self.input.v4, self.input.v6
local timestamp = get_timestamp()

Expand All @@ -802,16 +764,48 @@ function FlowMeter:push()
end

self:expire_records()

assert(self.output.output, "missing output link")

local outgoing = self.outgoing_messages

if self.next_template_refresh < engine.now() then
self.next_template_refresh = engine.now() + template_interval
self:send_template_records(outgoing)
end

for _, record in ipairs(self.flows.v4:get_expired()) do
self.export_v4:add_data_record(record, outgoing)
end
self.export_v4:flush_data_records(outgoing)

for _, record in ipairs(self.flows.v6:get_expired()) do
self.export_v6:add_data_record(record, outgoing)
end
self.export_v6:flush_data_records(outgoing)

for i=1,link.nreadable(outgoing) do
local pkt = link.receive(outgoing)
pkt = self:add_ipfix_header(pkt, remove_record_count(pkt))
pkt = self:add_transport_headers(pkt)
link.transmit(self.output.output, pkt)
end
end

function selftest()
print('selftest: apps.ipfix.ipfix')
local flows = FlowCache:new({})
local nf = FlowMeter:new({ cache = flows })
local ipfix = IPFIX:new({ cache = flows,
ipfix_version = 10,
exporter_mac = "00:11:22:33:44:55",
exporter_ip = "192.168.1.2",
collector_mac = "55:44:33:22:11:00",
collector_ip = "192.168.1.1",
collector_port = 4739 })

-- Mock input and output.
nf.input = { v4 = link.new_anonymous(), v6 = link.new_anonymous() }
nf.output = { output = link.new_anonymous() }
ipfix.input = { v4 = link.new_anonymous(), v6 = link.new_anonymous() }
ipfix.output = { output = link.new_anonymous() }

-- Test helper that supplies a packet with some given fields.
local function test_packet(is_ipv6, src_ip, dst_ip, src_port, dst_port)
Expand Down Expand Up @@ -846,9 +840,9 @@ function selftest()
dg:push(ip)
dg:push(eth)

local input = is_ipv6 and nf.input.v6 or nf.input.v4
local input = is_ipv6 and ipfix.input.v6 or ipfix.input.v4
link.transmit(input, dg:packet())
nf:push()
ipfix:push()
end

-- Populate with some known flows.
Expand Down Expand Up @@ -903,16 +897,6 @@ function selftest()
flows.v4.table:selfcheck()
flows.v6.table:selfcheck()

local flows = FlowCache:new({})
local conf = { cache = flows,
ipfix_version = 10,
exporter_mac = "00:11:22:33:44:55",
exporter_ip = "192.168.1.2",
collector_mac = "55:44:33:22:11:00",
collector_ip = "192.168.1.1",
collector_port = 4739 }
local exporter = FlowExporter:new(conf)

local key = flows.v4.preallocated_key
key.sourceIPv4Address = ipv4:pton("192.168.1.1")
key.destinationIPv4Address = ipv4:pton("192.168.1.25")
Expand All @@ -926,30 +910,28 @@ function selftest()
value.packetDeltaCount = 5
value.octetDeltaCount = 15

local record = exporter.export_v4.template.record_t(key, value)
local record = ipfix.export_v4.template.record_t(key, value)
-- Mock expiry.
function exporter.cache.v4:get_expired()
function ipfix.flows.v4:get_expired()
return { record }
end

-- Mock output.
local test_link = link.new_anonymous()
exporter.output = { output = test_link }

exporter:pull()
-- Template message; no data yet.
assert(link.nreadable(ipfix.output.output) == 1)
ipfix:push()
-- Template message and data message.
assert(link.nreadable(test_link) == 2)
assert(link.nreadable(ipfix.output.output) == 2)

local filter = require("pf").compile_filter([[
udp and dst port 4739 and src net 192.168.1.2 and
dst net 192.168.1.1]])

for i=1,link.nreadable(test_link) do
local p = link.receive(test_link)
for i=1,link.nreadable(ipfix.output.output) do
local p = link.receive(ipfix.output.output)
assert(filter(p.data, p.length), "pf filter failed")
packet.free(p)
end
exporter.output = {}
ipfix.output = {}

print("selftest ok")
end
16 changes: 7 additions & 9 deletions src/program/ipfix/probe/probe.lua
Expand Up @@ -134,8 +134,7 @@ function run (args)
local out_link, out_app = in_out_apps[output_type](args[2])

local flow_cache = ipfix.FlowCache:new({})
local meter_config = { cache = flow_cache }
local exporter_config = { cache = flow_cache,
local ipfix_config = { cache = flow_cache,
active_timeout = active_timeout,
idle_timeout = idle_timeout,
ipfix_version = ipfix_version,
Expand All @@ -147,15 +146,14 @@ function run (args)
local c = config.new()

config.app(c, "source", unpack(in_app))
config.app(c, "sink", unpack(out_app))
config.app(c, "splitter", V4V6.V4V6, {})
config.app(c, "meter", ipfix.FlowMeter, meter_config)
config.app(c, "exporter", ipfix.FlowExporter, exporter_config)
config.app(c, "ipfix", ipfix.IPFIX, ipfix_config)
config.app(c, "sink", unpack(out_app))

config.link(c, "source." .. in_link.output .. " -> splitter.input")
config.link(c, "splitter.v4 -> meter.v4")
config.link(c, "splitter.v6 -> meter.v6")
config.link(c, "exporter.output -> sink." .. out_link.input)
config.link(c, "splitter.v4 -> ipfix.v4")
config.link(c, "splitter.v6 -> ipfix.v6")
config.link(c, "ipfix.output -> sink." .. out_link.input)

local done
if not duration then
Expand All @@ -173,7 +171,7 @@ function run (args)

if report then
local end_time = now()
local app = engine.app_table.meter
local app = engine.app_table.ipfix
local v4_link, v6_link = app.input.v4, app.input.v6
local v4_stats, v6_stats = link.stats(v4_link), link.stats(v6_link)
local stats = { rxbytes = v4_stats.rxbytes + v6_stats.rxbytes,
Expand Down

0 comments on commit 318bac7

Please sign in to comment.