diff --git a/src/apps/ipfix/ipfix.lua b/src/apps/ipfix/ipfix.lua index bdc49c58e6..f638d48dfd 100644 --- a/src/apps/ipfix/ipfix.lua +++ b/src/apps/ipfix/ipfix.lua @@ -7,8 +7,8 @@ module(..., package.seeall) local bit = require("bit") local ffi = require("ffi") -local pf = require("pf") local template = require("apps.ipfix.template") +local metadata = require("apps.ipfix.packet_metadata") local lib = require("core.lib") local link = require("core.link") local packet = require("core.packet") @@ -23,6 +23,7 @@ local ctable = require("lib.ctable") local C = ffi.C local htonl, htons = lib.htonl, lib.htons +local metadata_add = metadata.add local debug = lib.getenv("FLOW_EXPORT_DEBUG") @@ -495,21 +496,22 @@ function IPFIX:push() local flow_sets = self.flow_sets local nreadable = link.nreadable(input) counter.add(self.shm.received_packets, nreadable) - for i=1, nreadable do - local pkt = link.receive(input) - local handled = false - for _,set in ipairs(flow_sets) do - if set.match(pkt.data, pkt.length) then - link.transmit(set.incoming, pkt) - handled = true - break + for _,set in ipairs(flow_sets) do + for _ = 1, nreadable do + local p = link.receive(input) + local md = metadata_add(p) + if set.match(md.filter_start, md.filter_length) then + link.transmit(set.incoming, p) + else + link.transmit(input, p) end end - -- Drop packet if it didn't match any flow set. - if not handled then - counter.add(self.shm.ignored_packets) - packet.free(pkt) - end + nreadable = link.nreadable(input) + end + + counter.add(self.shm.ignored_packets, nreadable) + for _ = 1, nreadable do + packet.free(link.receive(input)) end for _,set in ipairs(flow_sets) do set:record_flows(timestamp) end @@ -520,7 +522,6 @@ function IPFIX:push() set:sync_stats() end end - end function selftest() diff --git a/src/apps/ipfix/packet-metadata.lua b/src/apps/ipfix/packet-metadata.lua new file mode 100644 index 0000000000..d62612dad1 --- /dev/null +++ b/src/apps/ipfix/packet-metadata.lua @@ -0,0 +1,256 @@ +module(..., package.seeall) + +local ffi = require("ffi") +local lib = require("core.lib") +local consts = require("apps.lwaftr.constants") + +local ntohs = lib.ntohs +local htons = lib.htons + +local ethertype_ipv4 = consts.ethertype_ipv4 +local ethertype_ipv6 = consts.ethertype_ipv6 +local ethernet_header_size = consts.ethernet_header_size +local o_ipv4_total_length = consts.o_ipv4_total_length +local o_ipv4_ver_and_ihl = consts.o_ipv4_ver_and_ihl +local o_ipv4_flags = consts.o_ipv4_flags +local o_ipv4_proto = consts.o_ipv4_proto +local ipv6_fixed_header_size = consts.ipv6_fixed_header_size +local o_ipv6_payload_len = consts.o_ipv6_payload_len +local o_ipv6_next_header = consts.o_ipv6_next_header + +local uint16_ptr_t = ffi.typeof('uint16_t *') + +local function get_ipv4_total_length(l3) + return ntohs(ffi.cast(uint16_ptr_t, l3 + o_ipv4_total_length)[0]) +end + +local function get_ipv4_ihl(l3) + return (bit.band((l3 + o_ipv4_ver_and_ihl)[0], 0x0f)) +end + +local function get_ipv4_offset(l3) + local flags_offset = ntohs(ffi.cast(uint16_ptr_t, l3 + o_ipv4_flags)[0]) + return (bit.band(0x1fff, flags_offset)) +end + +local function get_ipv4_protocol(l3) return l3[o_ipv4_proto] end + +local function get_ipv6_payload_length(l3) + return ntohs(ffi.cast(uint16_ptr_t, l3 + o_ipv6_payload_len)[0]) +end +local function set_ipv6_payload_length(l3, length) + (ffi.cast(uint16_ptr_t, l3 + o_ipv6_payload_len))[0] = htons(length) +end + +local function get_ipv6_next_header(l3) return l3[o_ipv6_next_header] end +local function set_ipv6_next_header(l3, type) l3[o_ipv6_next_header] = type end + +local function ptr_to(ctype) return ffi.typeof('$*', ctype) end + +local ipv6_ext_hdr_t = ffi.typeof([[ + struct { + uint8_t next_header; + uint8_t length; + uint8_t data[0]; + } __attribute__((packed)) +]]) +local ipv6_ext_hdr_ptr_t = ptr_to(ipv6_ext_hdr_t) + +local ipv6_frag_hdr_t = ffi.typeof([[ + struct { + uint8_t next_header; + uint8_t reserved; + uint16_t offset_flags; + uint32_t identificaton; + } __attribute__((packed)) +]]) +local ipv6_frag_hdr_ptr_t = ptr_to(ipv6_frag_hdr_t) + +local function ipv6_generic_ext_hdr(ptr) + local ext_hdr = ffi.cast(ipv6_ext_hdr_ptr_t, ptr) + local next_header = ext_hdr.next_header + local length = ext_hdr.length + -- Length in unit of 8 byets, does not include the first 8 bytes + return length * 8 + 8, next_header +end + +-- The fragmentation header inspector sets this upvalue as a side +-- effect. Only at most one fragmentation header is expected in a +-- header chain. +local ipv6_frag_offset + +local ipv6_ext_hdr_fns = { + [0] = + -- Hop-by-hop + ipv6_generic_ext_hdr, + [43] = + -- Routing + ipv6_generic_ext_hdr, + [44] = + -- Fragmentation, fixed size (8 bytes) + function(ptr) + local frag_hdr = ffi.cast(ipv6_frag_hdr_ptr_t, ptr) + local next_header = frag_hdr.next_header + ipv6_frag_offset = bit.rshift(ntohs(frag_hdr.offset_flags), 3) + return 8, next_header + end, + [51] = + -- IPSec authentication header RFC4302 + function(ptr) + local ext_hdr = ffi.cast(ipv6_ext_hdr_ptr_t, ptr) + local next_header = ext_hdr.next_header + -- Length of the header in units of 4 bytes minus 2 + local payload_len = ext_hdr.length + return payload_len * 4 - 2, next_header + end, + [59] = + -- No next header + function(ptr) + return 0, 255 + end, + [60] = + -- Destination + ipv6_generic_ext_hdr, + [135] = + -- Mobility RFC6275 + ipv6_generic_ext_hdr, + [139] = + -- HIP RFC7401 + ipv6_generic_ext_hdr, + [140] = + -- Shim6 RFC5533 + ipv6_generic_ext_hdr, +} + +function squash_extension_headers(pkt, l3) + local payload = l3 + ipv6_fixed_header_size + local payload_length = get_ipv6_payload_length(l3) + local ulp = get_ipv6_next_header(l3) + + local next_header = ulp + local ext_hdrs_size = 0 + ipv6_frag_offset = 0 + local ipv6_ext_hdr_fn = ipv6_ext_hdr_fns[next_header] + while ipv6_ext_hdr_fn do + hdr_size, next_header = ipv6_ext_hdr_fn(payload + ext_hdrs_size) + ext_hdrs_size = ext_hdrs_size + hdr_size + if ext_hdrs_size < 0 or ext_hdrs_size > pkt.length then + -- The extension header has lead us out of the packet, bail + -- out and leave the packet unmodified + goto exit + end + ipv6_ext_hdr_fn = ipv6_ext_hdr_fns[next_header] + end + -- All extension headers known to us have been skipped. next_header + -- contains what we consider as the "upper layer protocol". + if ext_hdrs_size > 0 then + pkt.length = pkt.length - ext_hdrs_size + payload_length = payload_length - ext_hdrs_size + ulp = next_header + set_ipv6_next_header(l3, ulp) + set_ipv6_payload_length(l3, payload_length) + ffi.C.memmove(payload, payload + ext_hdrs_size, + payload_length + ffi.sizeof(pkt_meta_data_t)) + end + ::exit:: + return payload_length, ulp +end + +ether_header_t = ffi.typeof([[ + struct { + uint8_t dhost[6]; + uint8_t shost[6]; + union { + struct { + uint16_t type; + } ether; + struct { + uint16_t tpid; + uint16_t tci; + uint16_t type; + } dot1q; + }; + } __attribute__((packed)) +]]) +ether_header_ptr_t = ptr_to(ether_header_t) + +local magic_number = 0x5ABB + +pkt_meta_data_t = ffi.typeof([[ + struct { + uint16_t magic; + /* Actual ethertype for single-tagged frames */ + uint16_t ethertype; + /* vlan == 0 if untagged frame */ + uint16_t vlan; + /* Total size, excluding the L2 header */ + uint16_t total_length; + /* Pointer and length that can be passed directly to a pflua filter */ + uint8_t *filter_start; + uint16_t filter_length; + /* Pointers to the L3 and L4 headers */ + uint8_t *l3; + uint8_t *l4; + uint8_t proto; + /* Fragment offset in units of 8 bytes. Equals 0 if not fragmented + or initial fragment */ + uint8_t frag_offset; + /* Difference between packet length and length + according to the l3 header, negative if the + packet is truncated, == 0 if not. A positive value + would indicate that the packet contains some kind + of padding. This should not occur under normal + circumstances. */ + int16_t length_delta; + } __attribute__((packed)) +]]) +pkt_meta_data_ptr_t = ptr_to(pkt_meta_data_t) + +function get (pkt) + local md = ffi.cast(pkt_meta_data_ptr_t, pkt.data + pkt.length) + assert(md.magic == magic_number) + return md +end + +function add (pkt) + local vlan = 0 + local filter_offset = 0 + local l3_offset = ethernet_header_size + local hdr = ffi.cast(ether_header_ptr_t, pkt.data) + local ethertype = lib.ntohs(hdr.ether.type) + if ethertype == 0x8100 then + ethertype = lib.ntohs(hdr.dot1q.type) + vlan = bit.band(lib.ntohs(hdr.dot1q.tci), 0xFFF) + filter_offset = 4 + l3_offset = l3_offset + filter_offset + end + + local md = ffi.cast(pkt_meta_data_ptr_t, pkt.data + pkt.length) + md.magic = magic_number + md.ethertype = ethertype + md.vlan = vlan + md.filter_start = pkt.data + filter_offset + md.filter_length = pkt.length - filter_offset + local l3 = pkt.data + l3_offset + md.l3 = l3 + + if ethertype == ethertype_ipv4 then + md.total_length = get_ipv4_total_length(l3) + md.l4 = l3 + 4 * get_ipv4_ihl(l3) + md.frag_offset = get_ipv4_offset(l3) + md.proto = get_ipv4_protocol(l3) + elseif ethertype == ethertype_ipv6 then + --Remove all extension headers from the packet and track the + --position of the metadata block + local payload_length, next_header = + squash_extension_headers(pkt, l3) + md = get(pkt) + md.total_length = payload_length + ipv6_fixed_header_size + md.l4 = l3 + ipv6_fixed_header_size + md.frag_offset = ipv6_frag_offset + md.proto = next_header + end + + md.length_delta = pkt.length - l3_offset - md.total_length + return md +end diff --git a/src/apps/ipfix/template.lua b/src/apps/ipfix/template.lua index b6f966a57a..528bb9f61a 100644 --- a/src/apps/ipfix/template.lua +++ b/src/apps/ipfix/template.lua @@ -3,15 +3,20 @@ module(..., package.seeall) -local bit = require("bit") -local ffi = require("ffi") -local pf = require("pf") -local consts = require("apps.lwaftr.constants") -local lib = require("core.lib") +local bit = require("bit") +local ffi = require("ffi") +local pf = require("pf") +local consts = require("apps.lwaftr.constants") +local lib = require("core.lib") +local ctable = require("lib.ctable") +local ethernet = require("lib.protocol.ethernet") +local ipv4 = require("lib.protocol.ipv4") +local metadata = require("apps.ipfix.packet_metadata") local ntohs = lib.ntohs local htonl, htons = lib.htonl, lib.htons local function htonq(v) return bit.bswap(v + 0ULL) end +local metadata_get = metadata.get local function ptr_to(ctype) return ffi.typeof('$*', ctype) end @@ -33,18 +38,12 @@ local transport_proto_p = { -- These constants are taken from the lwaftr constants module, which -- is maybe a bad dependency but sharing code is good -- TODO: move constants somewhere else? lib? -local ethertype_ipv4 = consts.ethertype_ipv4 -local ethertype_ipv6 = consts.ethertype_ipv6 -local ethernet_header_size = consts.ethernet_header_size -local ipv6_fixed_header_size = consts.ipv6_fixed_header_size -local o_ethernet_ethertype = consts.o_ethernet_ethertype -local o_ipv4_total_length = consts.o_ipv4_total_length -local o_ipv4_ver_and_ihl = consts.o_ipv4_ver_and_ihl +local o_ipv4_dscp_and_ecn = consts.o_ipv4_dscp_and_ecn local o_ipv4_proto = consts.o_ipv4_proto local o_ipv4_src_addr = consts.o_ipv4_src_addr local o_ipv4_dst_addr = consts.o_ipv4_dst_addr -local o_ipv6_payload_len = consts.o_ipv6_payload_len -local o_ipv6_next_header = consts.o_ipv6_next_header +local o_icmpv4_msg_type = consts.o_icmpv4_msg_type +local o_icmpv4_msg_code = consts.o_icmpv4_msg_code local o_ipv6_src_addr = consts.o_ipv6_src_addr local o_ipv6_dst_addr = consts.o_ipv6_dst_addr @@ -168,19 +167,17 @@ end local uint16_ptr_t = ffi.typeof('uint16_t *') -local function get_ipv4_ihl(l3) - return bit.band((l3 + o_ipv4_ver_and_ihl)[0], 0x0f) +local function get_ipv4_tos(l3) return l3[o_ipv4_dscp_and_ecn] end +local function get_ipv6_tc(l3) + -- Version, traffic class and first part of flow label + local v_tc_fl = ntohs(ffi.cast(uint16_ptr_t, l3)[0]) + -- Traffic class is bits 4-11 (MSB to LSB) + return (bit.rshift(bit.band(0x0FF0, v_tc_fl), 4)) end -local function get_ipv4_total_length(l3) - return ntohs(ffi.cast(uint16_ptr_t, l3 + o_ipv4_total_length)[0]) +local function get_icmp_typecode(l4) + return ntohs(ffi.cast(uint16_ptr_t, l4+o_icmpv4_msg_type)[0]) end -local function get_ipv6_payload_length(l3) - return ntohs(ffi.cast(uint16_ptr_t, l3 + o_ipv6_payload_len)[0]) -end - -local function get_ipv4_protocol(l3) return l3[o_ipv4_proto] end -local function get_ipv6_next_header(l3) return l3[o_ipv6_next_header] end local function get_ipv4_src_addr_ptr(l3) return l3 + o_ipv4_src_addr end local function get_ipv4_dst_addr_ptr(l3) return l3 + o_ipv4_dst_addr end @@ -212,27 +209,26 @@ local function get_tcp_flags(l4) return ntohs(ffi.cast(uint16_ptr_t, l4)[6]) end -v4 = make_template_info { - id = 256, - filter = "ip", - keys = { "sourceIPv4Address", - "destinationIPv4Address", - "protocolIdentifier", - "sourceTransportPort", - "destinationTransportPort" }, - values = { "flowStartMilliseconds", - "flowEndMilliseconds", - "packetDeltaCount", - "octetDeltaCount", - "tcpControlBitsReduced" } -} +-- Address-family dependent extractors -local function extract_transport_key(entry, l4) +local function extract_v4_addr(l3, entry) + read_ipv4_src_address(l3, entry.key.sourceIPv4Address) + read_ipv4_dst_address(l3, entry.key.destinationIPv4Address) +end + +local function extract_v6_addr(l3, entry) + read_ipv6_src_address(l3, entry.key.sourceIPv6Address) + read_ipv6_dst_address(l3, entry.key.destinationIPv6Address) +end + +-- Address-family independent extract/accumulate functions + +local function extract_transport_key(l4, entry) entry.key.sourceTransportPort = get_transport_src_port(l4) entry.key.destinationTransportPort = get_transport_dst_port(l4) end -local function extract_tcp_flags(entry, l4) +local function extract_tcp_flags(l4, entry) -- Mask off data offset bits entry.value.tcpControlBits = bit.band(0xFFF, get_tcp_flags(l4)) end @@ -252,39 +248,61 @@ local function accumulate_tcp_flags_reduced(dst, new) new.value.tcpControlBitsReduced) end -function v4.extract(pkt, timestamp, entry) - local l2 = pkt.data - local l3 = l2 + ethernet_header_size - local ihl = get_ipv4_ihl(l3) - local l4 = l3 + ihl * 4 - +-- Clear key and value, extract the 3-tuple, fill in flow start/end +-- times and packet/octet counters. This is the bare minimum any +-- template will need. +local function extract_3_tuple(pkt, timestamp, entry, md, extract_addr_fn) ffi.fill(entry.key, ffi.sizeof(entry.key)) ffi.fill(entry.value, ffi.sizeof(entry.value)) - -- Fill key. - -- FIXME: Try using normal Lua assignment. - read_ipv4_src_address(l3, entry.key.sourceIPv4Address) - read_ipv4_dst_address(l3, entry.key.destinationIPv4Address) - local proto = get_ipv4_protocol(l3) - entry.key.protocolIdentifier = proto - if transport_proto_p[proto] then - extract_transport_key(entry, l4) - end - -- Fill value. + extract_addr_fn(md.l3, entry) + entry.key.protocolIdentifier = md.proto + entry.value.flowStartMilliseconds = timestamp entry.value.flowEndMilliseconds = timestamp entry.value.packetDeltaCount = 1 - entry.value.octetDeltaCount = get_ipv4_total_length(l3) - if proto == IP_PROTO_TCP then - extract_tcp_flags_reduced(entry, l4) + entry.value.octetDeltaCount = md.total_length +end + +local function extract_5_tuple(pkt, timestamp, entry, md, extract_addr_fn) + extract_3_tuple(pkt, timestamp, entry, md, extract_addr_fn) + if transport_proto_p[md.proto] and md.frag_offset == 0 then + extract_transport_key(md.l4, entry) end end -function v4.accumulate(dst, new) +local function accumulate_generic(dst, new) dst.value.flowEndMilliseconds = new.value.flowEndMilliseconds dst.value.packetDeltaCount = dst.value.packetDeltaCount + 1 dst.value.octetDeltaCount = dst.value.octetDeltaCount + new.value.octetDeltaCount +end + +v4 = make_template_info { + id = 256, + filter = "ip", + keys = { "sourceIPv4Address", + "destinationIPv4Address", + "protocolIdentifier", + "sourceTransportPort", + "destinationTransportPort" }, + values = { "flowStartMilliseconds", + "flowEndMilliseconds", + "packetDeltaCount", + "octetDeltaCount", + "tcpControlBitsReduced" } +} + +function v4.extract(pkt, timestamp, entry) + local md = metadata_get(pkt) + extract_5_tuple(pkt, timestamp, entry, md, extract_v4_addr) + if md.proto == IP_PROTO_TCP and md.frag_offset == 0 then + extract_tcp_flags_reduced(md.l4, entry) + end +end + +function v4.accumulate(dst, new) + accumulate_generic(dst, new) if dst.key.protocolIdentifier == IP_PROTO_TCP then accumulate_tcp_flags_reduced(dst, new) end @@ -318,39 +336,15 @@ v6 = make_template_info { } function v6.extract(pkt, timestamp, entry) - local l2 = pkt.data - local l3 = l2 + ethernet_header_size - -- TODO: handle chained headers - local l4 = l3 + ipv6_fixed_header_size - - ffi.fill(entry.key, ffi.sizeof(entry.key)) - ffi.fill(entry.value, ffi.sizeof(entry.value)) - -- Fill key. - -- FIXME: Try using normal Lua assignment. - read_ipv6_src_address(l3, entry.key.sourceIPv6Address) - read_ipv6_dst_address(l3, entry.key.destinationIPv6Address) - local proto = get_ipv6_next_header(l3) - entry.key.protocolIdentifier = proto - if transport_proto_p[proto] then - extract_transport_key(entry, l4) - end - - -- Fill value. - entry.value.flowStartMilliseconds = timestamp - entry.value.flowEndMilliseconds = timestamp - entry.value.packetDeltaCount = 1 - entry.value.octetDeltaCount = get_ipv6_payload_length(l3) - + ipv6_fixed_header_size - if proto == IP_PROTO_TCP then - extract_tcp_flags_reduced(entry, l4) + local md = metadata_get(pkt) + extract_5_tuple(pkt, timestamp, entry, md, extract_v6_addr) + if md.proto == IP_PROTO_TCP and md.frag_offset == 0 then + extract_tcp_flags_reduced(md.l4, entry) end end function v6.accumulate(dst, new) - dst.value.flowEndMilliseconds = new.value.flowEndMilliseconds - dst.value.packetDeltaCount = dst.value.packetDeltaCount + 1 - dst.value.octetDeltaCount = - dst.value.octetDeltaCount + new.value.octetDeltaCount + accumulate_generic(dst, new) if dst.key.protocolIdentifier == IP_PROTO_TCP then accumulate_tcp_flags_reduced(dst, new) end