Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Do a major refactoring of the flow exporter into two apps
This separates the metering code from the export code, which
is a cleaner separation. Currently the flow expiry code is in
the export app, but it's not clear that that's the right place.

It may be more appropriate to put it in the metering app, with
expired packets going in some kind of queue perhaps in the flow
cache for the other app to read out of.
  • Loading branch information
takikawa authored and wingo committed Jul 25, 2017
1 parent 0d11dae commit 0582024
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 221 deletions.
82 changes: 82 additions & 0 deletions src/apps/flow_export/cache.lua
@@ -0,0 +1,82 @@
-- This module implements the FlowCache class, which is used to track
-- flow keys & flow records for IPFIX flow metering and exporting.

module(..., package.seeall)

local ffi = require("ffi")
local ctable = require("lib.ctable")

-- Flow key & flow record FFI type definitions
--
-- see https://www.iana.org/assignments/ipfix/ipfix.xhtml for
-- information on the IEs for the flow key and record
ffi.cdef[[
struct flow_key {
uint32_t src_ipv4; /* sourceIPv4Address */
uint32_t dst_ipv4; /* destinationIPv4Address */
uint32_t src_ipv6_1; /* sourceIPv6Address */
uint32_t src_ipv6_2;
uint32_t src_ipv6_3;
uint32_t src_ipv6_4;
uint32_t dst_ipv6_1; /* destinationIPv6Address */
uint32_t dst_ipv6_2;
uint32_t dst_ipv6_3;
uint32_t dst_ipv6_4;
uint8_t is_ipv6;
uint8_t protocol; /* protocolIdentifier */
uint16_t src_port; /* sourceTransportPort */
uint16_t dst_port; /* destinationTransportPort */
uint16_t __padding;
} __attribute__((packed));

struct flow_record {
uint64_t start_time; /* flowStartMilliseconds */
uint64_t end_time; /* flowEndMilliseconds */
uint64_t pkt_count; /* packetDeltaCount */
uint64_t octet_count; /* octetDeltaCount */
uint32_t ingress; /* ingressInterface */
uint32_t egress; /* egressInterface */
uint32_t src_peer_as; /* bgpPrevAdjacentAsNumber */
uint32_t dst_peer_as; /* bgpNextAdjacentAsNumber */
uint16_t tcp_control; /* tcpControlBits */
uint8_t tos; /* ipClassOfService */
uint8_t __padding;
} __attribute__((packed));
]]

FlowCache = {}

function FlowCache:new(config)
assert(config, "expected configuration table")

local o = { -- TODO: compute the default cache value
-- based on expected flow amounts?
cache_size = config.cache_size or 20000 }

local params = {
key_type = ffi.typeof("struct flow_key"),
value_type = ffi.typeof("struct flow_record"),
max_occupancy_rate = 0.4,
initial_size = math.ceil(o.cache_size / 0.4),
}

o.table = ctable.new(params)

return setmetatable(o, { __index = self })
end

function FlowCache:add(flow_key, flow_record)
self.table:add(flow_key, flow_record)
end

function FlowCache:lookup(flow_key)
return self.table:lookup_ptr(flow_key)
end

function FlowCache:iterate()
return self.table:iterate()
end

function FlowCache:remove(flow_key)
self.table:remove(flow_key)
end
212 changes: 18 additions & 194 deletions src/apps/flow_export/flow_export.lua
@@ -1,6 +1,5 @@
-- This module implements the flow exporter app, which records
-- network traffic information and exports it in Netflow v9 format
-- (RFC 3954) or IPFIX format (RFC 3917 and others).
-- This module implements the flow metering app, which records
-- IP flows as part of an IP flow export program.

module(..., package.seeall)

Expand All @@ -17,7 +16,8 @@ local ipv6 = require("lib.protocol.ipv6")
local tcp = require("lib.protocol.tcp")
local C = ffi.C

local htonl, htons = lib.htonl, lib.htons
local htonl, htons = lib.htonl, lib.htons
local get_timestamp = util.get_timestamp

local debug = lib.getenv("FLOW_EXPORT_DEBUG")

Expand All @@ -30,185 +30,15 @@ local IP_PROTO_SCTP = 132

local TCP_CONTROL_BITS_OFFSET = 11

-- see https://www.iana.org/assignments/ipfix/ipfix.xhtml for
-- information on the IEs for the flow key and record
ffi.cdef[[
struct flow_key {
uint32_t src_ipv4; /* sourceIPv4Address */
uint32_t dst_ipv4; /* destinationIPv4Address */
uint32_t src_ipv6_1; /* sourceIPv6Address */
uint32_t src_ipv6_2;
uint32_t src_ipv6_3;
uint32_t src_ipv6_4;
uint32_t dst_ipv6_1; /* destinationIPv6Address */
uint32_t dst_ipv6_2;
uint32_t dst_ipv6_3;
uint32_t dst_ipv6_4;
uint8_t is_ipv6;
uint8_t protocol; /* protocolIdentifier */
uint16_t src_port; /* sourceTransportPort */
uint16_t dst_port; /* destinationTransportPort */
uint16_t __padding;
} __attribute__((packed));

struct flow_record {
uint64_t start_time; /* flowStartMilliseconds */
uint64_t end_time; /* flowEndMilliseconds */
uint64_t pkt_count; /* packetDeltaCount */
uint64_t octet_count; /* octetDeltaCount */
uint32_t ingress; /* ingressInterface */
uint32_t egress; /* egressInterface */
uint32_t src_peer_as; /* bgpPrevAdjacentAsNumber */
uint32_t dst_peer_as; /* bgpNextAdjacentAsNumber */
uint16_t tcp_control; /* tcpControlBits */
uint8_t tos; /* ipClassOfService */
uint8_t __padding;
} __attribute__((packed));
]]

FlowExporter = {}

-- TODO: should be configurable
-- these numbers are placeholders for more realistic ones
-- (and timeouts should perhaps be more fine-grained)
local cache_size = 20000
local export_interval = 60

-- RFC5153 recommends a 10-minute template refresh configurable from
-- 1 minute to 1 day (https://tools.ietf.org/html/rfc5153#section-6.2)
local template_interval = 600

-- produce a timestamp in milliseconds
local function get_timestamp()
return C.get_unix_time() * 1000ULL
end

-- print debugging messages for flow expiration
local function debug_expire(entry, msg)
if debug then
local key = entry.key
local src_ip, dst_ip

if key.is_ipv6 == 1 then
local ptr = ffi.cast("uint8_t*", key) + ffi.offsetof(key, "src_ipv6_1")
src_ip = ipv6:ntop(ptr)
local ptr = ffi.cast("uint8_t*", key) + ffi.offsetof(key, "dst_ipv6_1")
dst_ip = ipv6:ntop(ptr)
else
local ptr = ffi.cast("uint8_t*", key) + ffi.offsetof(key, "src_ipv4")
src_ip = ipv4:ntop(ptr)
local ptr = ffi.cast("uint8_t*", key) + ffi.offsetof(key, "dst_ipv4")
dst_ip = ipv4:ntop(ptr)
end

util.fe_debug("expire flow [%s] %s (%d) -> %s (%d) proto: %d",
msg,
src_ip,
htons(key.src_port),
dst_ip,
htons(key.dst_port),
key.protocol)
end
end

-- Walk through flow cache to see if flow records need to be expired.
-- Collect expired records and export them to the collector.
local function init_expire_records()
local last_time

return function(self)
local now = tonumber(engine.now())
last_time = last_time or now

if now - last_time >= export_interval then
last_time = now
local timestamp = get_timestamp()
local keys_to_remove = {}
local timeout_records = {}
local to_export = {}

-- TODO: Walking the table here is done in serial with flow record
-- updates, but in the future this should be done concurrently
-- in a separate process. i.e., locking required here
for entry in self.flows:iterate() do
local record = entry.value

if timestamp - record.end_time > self.idle_timeout then
debug_expire(entry, "idle")
table.insert(keys_to_remove, entry.key)
table.insert(to_export, entry)
elseif timestamp - record.start_time > self.active_timeout then
debug_expire(entry, "active")
table.insert(timeout_records, record)
table.insert(to_export, entry)
end
end

self.exporter:export_records(self.output.output, to_export)

-- remove idle timed out flows
for _, key in ipairs(keys_to_remove) do
self.flows:remove(key)
end

for _, record in ipairs(timeout_records) do
-- TODO: what should timers reset to?
record.start_time = timestamp
record.end_time = timestamp
record.pkt_count = 0
record.octet_count = 0
end
end
end
end

-- periodically refresh the templates on the collector
local function init_refresh_templates()
local last_time
FlowMeter = {}

return function(self)
local now = tonumber(engine.now())

if not last_time or now - last_time >= template_interval then
self.exporter:send_template_record(self.output.output)
last_time = now
end
end
end

function FlowExporter:new(config)
local params = {
key_type = ffi.typeof("struct flow_key"),
value_type = ffi.typeof("struct flow_record"),
max_occupancy_rate = 0.4,
initial_size = math.ceil(cache_size / 0.4),
}
local o = { flows = ctable.new(params),
idle_timeout = config.idle_timeout or 300,
active_timeout = config.active_timeout or 120,
export_timer = nil,
template_timer = nil,
-- instance of the class that talks to the collector
exporter = ipfix.Exporter:new({
boot_time = get_timestamp(),
version = assert(config.ipfix_version),
mtu = config.mtu,
observation_domain = config.observation_domain or 256,
exporter_mac = assert(config.exporter_mac),
exporter_ip = assert(config.exporter_ip),
-- TODO: use ARP to avoid needing this
collector_mac = assert(config.collector_mac),
collector_ip = assert(config.collector_ip),
collector_port = assert(config.collector_port)
}) }

o.expire_records = init_expire_records()
o.refresh_templates = init_refresh_templates()
function FlowMeter:new(config)
local o = { flows = assert(config.cache) }

return setmetatable(o, { __index = self })
end

function FlowExporter:process_packet(pkt)
function FlowMeter:process_packet(pkt)
-- TODO: using the header libraries for now, but can rewrite this
-- code if it turns out to be too slow
local flow_key = ffi.new("struct flow_key")
Expand Down Expand Up @@ -251,7 +81,7 @@ function FlowExporter:process_packet(pkt)
ffi.cast("uint16_t*", pkt.data + eth_size + ip_size + 2)[0]
end

local lookup_result = self.flows:lookup_ptr(flow_key)
local lookup_result = self.flows:lookup(flow_key)
local flow_record

if lookup_result == nil then
Expand Down Expand Up @@ -286,27 +116,21 @@ function FlowExporter:process_packet(pkt)
packet.free(pkt)
end

function FlowExporter:push()
local input = assert(self.input.input)
function FlowMeter:push()
local input = assert(self.input.input)

while not link.empty(input) do
local pkt = link.receive(input)
self:process_packet(pkt)
end

self:refresh_templates()
self:expire_records()
end

function selftest()
local datagram = require("lib.protocol.datagram")
local cache = require("apps.flow_export.cache")

local nf = FlowExporter:new({ ipfix_version = 10,
exporter_mac = "01:02:03:04:05:06",
exporter_ip = "192.168.0.2",
collector_mac = "09:08:07:06:05:04",
collector_ip = "192.168.0.3",
collector_port = 2100 })
local flows = cache.FlowCache:new({})
local nf = FlowMeter:new({ cache = flows })

-- test helper that processes a packet with some given fields
local function test_packet(is_ipv6, src_ip, dst_ip, src_port, dst_port)
Expand Down Expand Up @@ -351,7 +175,7 @@ function selftest()
test_packet(false, "192.168.1.25", "8.8.8.8", 58342, 53)
test_packet(false, "8.8.8.8", "192.168.1.25", 53, 58342)
test_packet(true, "2001:4860:4860::8888", "2001:db8::ff00:42:8329", 53, 57777)
assert(nf.flows.occupancy == 5, "wrong number of flows")
assert(flows.table.occupancy == 5, "wrong number of flows")

-- do some packets with random data to test that it doesn't interfere
for i=1, 100 do
Expand All @@ -371,7 +195,7 @@ function selftest()
key.src_port = htons(9999)
key.dst_port = htons(80)

local result = nf.flows:lookup_ptr(key)
local result = flows:lookup(key)
assert(result, "key not found")
assert(result.value.pkt_count == 1)

Expand All @@ -389,12 +213,12 @@ function selftest()
key.src_port = htons(53)
key.dst_port = htons(57777)

local result = nf.flows:lookup_ptr(key)
local result = flows:lookup(key)
assert(result, "key not found")
assert(result.value.pkt_count == 1)

-- sanity check
nf.flows:selfcheck()
flows.table:selfcheck()

print("selftest ok")
end

0 comments on commit 0582024

Please sign in to comment.