From 30b7c4f4e8de18d3f789c571d5e41b9a338071c4 Mon Sep 17 00:00:00 2001 From: Alexander Gall Date: Tue, 12 Sep 2017 09:47:44 +0200 Subject: [PATCH] Support VLAN-tagged traffic, IPv6 extension headers The current version of pflua cannot handle VLAN tags at all and IPv6 extenstion headers only in a very limited fashion (the latter is mostly a problem with the underlying pflang language). To work around these issues as well as to simplify address-family independent code in the extractor and accumulator methods of templates, all packets are pre-processed before they are passed to a template's matching function. The pre-processor recognizes 802.1q-tagged packets and arranges that the matching function sees the proper Ethertype. It also detects whether a packet is either unfragmented or is the initial fragment of a fragmented packet (in which case the payload does not contain a L4 header). It also detects all IPv6 extenstion headers that might be present and removes them from the packet such that the matching function always sees a plain IPv6 packet with the fixed header only. Information about the VLAN tag, fragmentation status, location of L3 and L4 headers in the packet as well as whether the packet is truncated or not (e.g. when received from a port-mirror that only copies the beginning of a packet) is added as a "metadata block" just after the packet data for access by the extractor/accumulator methods. --- src/apps/ipfix/ipfix.lua | 31 ++-- src/apps/ipfix/packet-metadata.lua | 256 +++++++++++++++++++++++++++++ src/apps/ipfix/template.lua | 172 ++++++++++--------- 3 files changed, 355 insertions(+), 104 deletions(-) create mode 100644 src/apps/ipfix/packet-metadata.lua diff --git a/src/apps/ipfix/ipfix.lua b/src/apps/ipfix/ipfix.lua index bdc49c58e6..f638d48dfd 100644 --- a/src/apps/ipfix/ipfix.lua +++ b/src/apps/ipfix/ipfix.lua @@ -7,8 +7,8 @@ module(..., package.seeall) local bit = require("bit") local ffi = require("ffi") -local pf = require("pf") local template = require("apps.ipfix.template") +local metadata = require("apps.ipfix.packet_metadata") local lib = require("core.lib") local link = require("core.link") local packet = require("core.packet") @@ -23,6 +23,7 @@ local ctable = require("lib.ctable") local C = ffi.C local htonl, htons = lib.htonl, lib.htons +local metadata_add = metadata.add local debug = lib.getenv("FLOW_EXPORT_DEBUG") @@ -495,21 +496,22 @@ function IPFIX:push() local flow_sets = self.flow_sets local nreadable = link.nreadable(input) counter.add(self.shm.received_packets, nreadable) - for i=1, nreadable do - local pkt = link.receive(input) - local handled = false - for _,set in ipairs(flow_sets) do - if set.match(pkt.data, pkt.length) then - link.transmit(set.incoming, pkt) - handled = true - break + for _,set in ipairs(flow_sets) do + for _ = 1, nreadable do + local p = link.receive(input) + local md = metadata_add(p) + if set.match(md.filter_start, md.filter_length) then + link.transmit(set.incoming, p) + else + link.transmit(input, p) end end - -- Drop packet if it didn't match any flow set. - if not handled then - counter.add(self.shm.ignored_packets) - packet.free(pkt) - end + nreadable = link.nreadable(input) + end + + counter.add(self.shm.ignored_packets, nreadable) + for _ = 1, nreadable do + packet.free(link.receive(input)) end for _,set in ipairs(flow_sets) do set:record_flows(timestamp) end @@ -520,7 +522,6 @@ function IPFIX:push() set:sync_stats() end end - end function selftest() diff --git a/src/apps/ipfix/packet-metadata.lua b/src/apps/ipfix/packet-metadata.lua new file mode 100644 index 0000000000..d62612dad1 --- /dev/null +++ b/src/apps/ipfix/packet-metadata.lua @@ -0,0 +1,256 @@ +module(..., package.seeall) + +local ffi = require("ffi") +local lib = require("core.lib") +local consts = require("apps.lwaftr.constants") + +local ntohs = lib.ntohs +local htons = lib.htons + +local ethertype_ipv4 = consts.ethertype_ipv4 +local ethertype_ipv6 = consts.ethertype_ipv6 +local ethernet_header_size = consts.ethernet_header_size +local o_ipv4_total_length = consts.o_ipv4_total_length +local o_ipv4_ver_and_ihl = consts.o_ipv4_ver_and_ihl +local o_ipv4_flags = consts.o_ipv4_flags +local o_ipv4_proto = consts.o_ipv4_proto +local ipv6_fixed_header_size = consts.ipv6_fixed_header_size +local o_ipv6_payload_len = consts.o_ipv6_payload_len +local o_ipv6_next_header = consts.o_ipv6_next_header + +local uint16_ptr_t = ffi.typeof('uint16_t *') + +local function get_ipv4_total_length(l3) + return ntohs(ffi.cast(uint16_ptr_t, l3 + o_ipv4_total_length)[0]) +end + +local function get_ipv4_ihl(l3) + return (bit.band((l3 + o_ipv4_ver_and_ihl)[0], 0x0f)) +end + +local function get_ipv4_offset(l3) + local flags_offset = ntohs(ffi.cast(uint16_ptr_t, l3 + o_ipv4_flags)[0]) + return (bit.band(0x1fff, flags_offset)) +end + +local function get_ipv4_protocol(l3) return l3[o_ipv4_proto] end + +local function get_ipv6_payload_length(l3) + return ntohs(ffi.cast(uint16_ptr_t, l3 + o_ipv6_payload_len)[0]) +end +local function set_ipv6_payload_length(l3, length) + (ffi.cast(uint16_ptr_t, l3 + o_ipv6_payload_len))[0] = htons(length) +end + +local function get_ipv6_next_header(l3) return l3[o_ipv6_next_header] end +local function set_ipv6_next_header(l3, type) l3[o_ipv6_next_header] = type end + +local function ptr_to(ctype) return ffi.typeof('$*', ctype) end + +local ipv6_ext_hdr_t = ffi.typeof([[ + struct { + uint8_t next_header; + uint8_t length; + uint8_t data[0]; + } __attribute__((packed)) +]]) +local ipv6_ext_hdr_ptr_t = ptr_to(ipv6_ext_hdr_t) + +local ipv6_frag_hdr_t = ffi.typeof([[ + struct { + uint8_t next_header; + uint8_t reserved; + uint16_t offset_flags; + uint32_t identificaton; + } __attribute__((packed)) +]]) +local ipv6_frag_hdr_ptr_t = ptr_to(ipv6_frag_hdr_t) + +local function ipv6_generic_ext_hdr(ptr) + local ext_hdr = ffi.cast(ipv6_ext_hdr_ptr_t, ptr) + local next_header = ext_hdr.next_header + local length = ext_hdr.length + -- Length in unit of 8 byets, does not include the first 8 bytes + return length * 8 + 8, next_header +end + +-- The fragmentation header inspector sets this upvalue as a side +-- effect. Only at most one fragmentation header is expected in a +-- header chain. +local ipv6_frag_offset + +local ipv6_ext_hdr_fns = { + [0] = + -- Hop-by-hop + ipv6_generic_ext_hdr, + [43] = + -- Routing + ipv6_generic_ext_hdr, + [44] = + -- Fragmentation, fixed size (8 bytes) + function(ptr) + local frag_hdr = ffi.cast(ipv6_frag_hdr_ptr_t, ptr) + local next_header = frag_hdr.next_header + ipv6_frag_offset = bit.rshift(ntohs(frag_hdr.offset_flags), 3) + return 8, next_header + end, + [51] = + -- IPSec authentication header RFC4302 + function(ptr) + local ext_hdr = ffi.cast(ipv6_ext_hdr_ptr_t, ptr) + local next_header = ext_hdr.next_header + -- Length of the header in units of 4 bytes minus 2 + local payload_len = ext_hdr.length + return payload_len * 4 - 2, next_header + end, + [59] = + -- No next header + function(ptr) + return 0, 255 + end, + [60] = + -- Destination + ipv6_generic_ext_hdr, + [135] = + -- Mobility RFC6275 + ipv6_generic_ext_hdr, + [139] = + -- HIP RFC7401 + ipv6_generic_ext_hdr, + [140] = + -- Shim6 RFC5533 + ipv6_generic_ext_hdr, +} + +function squash_extension_headers(pkt, l3) + local payload = l3 + ipv6_fixed_header_size + local payload_length = get_ipv6_payload_length(l3) + local ulp = get_ipv6_next_header(l3) + + local next_header = ulp + local ext_hdrs_size = 0 + ipv6_frag_offset = 0 + local ipv6_ext_hdr_fn = ipv6_ext_hdr_fns[next_header] + while ipv6_ext_hdr_fn do + hdr_size, next_header = ipv6_ext_hdr_fn(payload + ext_hdrs_size) + ext_hdrs_size = ext_hdrs_size + hdr_size + if ext_hdrs_size < 0 or ext_hdrs_size > pkt.length then + -- The extension header has lead us out of the packet, bail + -- out and leave the packet unmodified + goto exit + end + ipv6_ext_hdr_fn = ipv6_ext_hdr_fns[next_header] + end + -- All extension headers known to us have been skipped. next_header + -- contains what we consider as the "upper layer protocol". + if ext_hdrs_size > 0 then + pkt.length = pkt.length - ext_hdrs_size + payload_length = payload_length - ext_hdrs_size + ulp = next_header + set_ipv6_next_header(l3, ulp) + set_ipv6_payload_length(l3, payload_length) + ffi.C.memmove(payload, payload + ext_hdrs_size, + payload_length + ffi.sizeof(pkt_meta_data_t)) + end + ::exit:: + return payload_length, ulp +end + +ether_header_t = ffi.typeof([[ + struct { + uint8_t dhost[6]; + uint8_t shost[6]; + union { + struct { + uint16_t type; + } ether; + struct { + uint16_t tpid; + uint16_t tci; + uint16_t type; + } dot1q; + }; + } __attribute__((packed)) +]]) +ether_header_ptr_t = ptr_to(ether_header_t) + +local magic_number = 0x5ABB + +pkt_meta_data_t = ffi.typeof([[ + struct { + uint16_t magic; + /* Actual ethertype for single-tagged frames */ + uint16_t ethertype; + /* vlan == 0 if untagged frame */ + uint16_t vlan; + /* Total size, excluding the L2 header */ + uint16_t total_length; + /* Pointer and length that can be passed directly to a pflua filter */ + uint8_t *filter_start; + uint16_t filter_length; + /* Pointers to the L3 and L4 headers */ + uint8_t *l3; + uint8_t *l4; + uint8_t proto; + /* Fragment offset in units of 8 bytes. Equals 0 if not fragmented + or initial fragment */ + uint8_t frag_offset; + /* Difference between packet length and length + according to the l3 header, negative if the + packet is truncated, == 0 if not. A positive value + would indicate that the packet contains some kind + of padding. This should not occur under normal + circumstances. */ + int16_t length_delta; + } __attribute__((packed)) +]]) +pkt_meta_data_ptr_t = ptr_to(pkt_meta_data_t) + +function get (pkt) + local md = ffi.cast(pkt_meta_data_ptr_t, pkt.data + pkt.length) + assert(md.magic == magic_number) + return md +end + +function add (pkt) + local vlan = 0 + local filter_offset = 0 + local l3_offset = ethernet_header_size + local hdr = ffi.cast(ether_header_ptr_t, pkt.data) + local ethertype = lib.ntohs(hdr.ether.type) + if ethertype == 0x8100 then + ethertype = lib.ntohs(hdr.dot1q.type) + vlan = bit.band(lib.ntohs(hdr.dot1q.tci), 0xFFF) + filter_offset = 4 + l3_offset = l3_offset + filter_offset + end + + local md = ffi.cast(pkt_meta_data_ptr_t, pkt.data + pkt.length) + md.magic = magic_number + md.ethertype = ethertype + md.vlan = vlan + md.filter_start = pkt.data + filter_offset + md.filter_length = pkt.length - filter_offset + local l3 = pkt.data + l3_offset + md.l3 = l3 + + if ethertype == ethertype_ipv4 then + md.total_length = get_ipv4_total_length(l3) + md.l4 = l3 + 4 * get_ipv4_ihl(l3) + md.frag_offset = get_ipv4_offset(l3) + md.proto = get_ipv4_protocol(l3) + elseif ethertype == ethertype_ipv6 then + --Remove all extension headers from the packet and track the + --position of the metadata block + local payload_length, next_header = + squash_extension_headers(pkt, l3) + md = get(pkt) + md.total_length = payload_length + ipv6_fixed_header_size + md.l4 = l3 + ipv6_fixed_header_size + md.frag_offset = ipv6_frag_offset + md.proto = next_header + end + + md.length_delta = pkt.length - l3_offset - md.total_length + return md +end diff --git a/src/apps/ipfix/template.lua b/src/apps/ipfix/template.lua index b6f966a57a..528bb9f61a 100644 --- a/src/apps/ipfix/template.lua +++ b/src/apps/ipfix/template.lua @@ -3,15 +3,20 @@ module(..., package.seeall) -local bit = require("bit") -local ffi = require("ffi") -local pf = require("pf") -local consts = require("apps.lwaftr.constants") -local lib = require("core.lib") +local bit = require("bit") +local ffi = require("ffi") +local pf = require("pf") +local consts = require("apps.lwaftr.constants") +local lib = require("core.lib") +local ctable = require("lib.ctable") +local ethernet = require("lib.protocol.ethernet") +local ipv4 = require("lib.protocol.ipv4") +local metadata = require("apps.ipfix.packet_metadata") local ntohs = lib.ntohs local htonl, htons = lib.htonl, lib.htons local function htonq(v) return bit.bswap(v + 0ULL) end +local metadata_get = metadata.get local function ptr_to(ctype) return ffi.typeof('$*', ctype) end @@ -33,18 +38,12 @@ local transport_proto_p = { -- These constants are taken from the lwaftr constants module, which -- is maybe a bad dependency but sharing code is good -- TODO: move constants somewhere else? lib? -local ethertype_ipv4 = consts.ethertype_ipv4 -local ethertype_ipv6 = consts.ethertype_ipv6 -local ethernet_header_size = consts.ethernet_header_size -local ipv6_fixed_header_size = consts.ipv6_fixed_header_size -local o_ethernet_ethertype = consts.o_ethernet_ethertype -local o_ipv4_total_length = consts.o_ipv4_total_length -local o_ipv4_ver_and_ihl = consts.o_ipv4_ver_and_ihl +local o_ipv4_dscp_and_ecn = consts.o_ipv4_dscp_and_ecn local o_ipv4_proto = consts.o_ipv4_proto local o_ipv4_src_addr = consts.o_ipv4_src_addr local o_ipv4_dst_addr = consts.o_ipv4_dst_addr -local o_ipv6_payload_len = consts.o_ipv6_payload_len -local o_ipv6_next_header = consts.o_ipv6_next_header +local o_icmpv4_msg_type = consts.o_icmpv4_msg_type +local o_icmpv4_msg_code = consts.o_icmpv4_msg_code local o_ipv6_src_addr = consts.o_ipv6_src_addr local o_ipv6_dst_addr = consts.o_ipv6_dst_addr @@ -168,19 +167,17 @@ end local uint16_ptr_t = ffi.typeof('uint16_t *') -local function get_ipv4_ihl(l3) - return bit.band((l3 + o_ipv4_ver_and_ihl)[0], 0x0f) +local function get_ipv4_tos(l3) return l3[o_ipv4_dscp_and_ecn] end +local function get_ipv6_tc(l3) + -- Version, traffic class and first part of flow label + local v_tc_fl = ntohs(ffi.cast(uint16_ptr_t, l3)[0]) + -- Traffic class is bits 4-11 (MSB to LSB) + return (bit.rshift(bit.band(0x0FF0, v_tc_fl), 4)) end -local function get_ipv4_total_length(l3) - return ntohs(ffi.cast(uint16_ptr_t, l3 + o_ipv4_total_length)[0]) +local function get_icmp_typecode(l4) + return ntohs(ffi.cast(uint16_ptr_t, l4+o_icmpv4_msg_type)[0]) end -local function get_ipv6_payload_length(l3) - return ntohs(ffi.cast(uint16_ptr_t, l3 + o_ipv6_payload_len)[0]) -end - -local function get_ipv4_protocol(l3) return l3[o_ipv4_proto] end -local function get_ipv6_next_header(l3) return l3[o_ipv6_next_header] end local function get_ipv4_src_addr_ptr(l3) return l3 + o_ipv4_src_addr end local function get_ipv4_dst_addr_ptr(l3) return l3 + o_ipv4_dst_addr end @@ -212,27 +209,26 @@ local function get_tcp_flags(l4) return ntohs(ffi.cast(uint16_ptr_t, l4)[6]) end -v4 = make_template_info { - id = 256, - filter = "ip", - keys = { "sourceIPv4Address", - "destinationIPv4Address", - "protocolIdentifier", - "sourceTransportPort", - "destinationTransportPort" }, - values = { "flowStartMilliseconds", - "flowEndMilliseconds", - "packetDeltaCount", - "octetDeltaCount", - "tcpControlBitsReduced" } -} +-- Address-family dependent extractors -local function extract_transport_key(entry, l4) +local function extract_v4_addr(l3, entry) + read_ipv4_src_address(l3, entry.key.sourceIPv4Address) + read_ipv4_dst_address(l3, entry.key.destinationIPv4Address) +end + +local function extract_v6_addr(l3, entry) + read_ipv6_src_address(l3, entry.key.sourceIPv6Address) + read_ipv6_dst_address(l3, entry.key.destinationIPv6Address) +end + +-- Address-family independent extract/accumulate functions + +local function extract_transport_key(l4, entry) entry.key.sourceTransportPort = get_transport_src_port(l4) entry.key.destinationTransportPort = get_transport_dst_port(l4) end -local function extract_tcp_flags(entry, l4) +local function extract_tcp_flags(l4, entry) -- Mask off data offset bits entry.value.tcpControlBits = bit.band(0xFFF, get_tcp_flags(l4)) end @@ -252,39 +248,61 @@ local function accumulate_tcp_flags_reduced(dst, new) new.value.tcpControlBitsReduced) end -function v4.extract(pkt, timestamp, entry) - local l2 = pkt.data - local l3 = l2 + ethernet_header_size - local ihl = get_ipv4_ihl(l3) - local l4 = l3 + ihl * 4 - +-- Clear key and value, extract the 3-tuple, fill in flow start/end +-- times and packet/octet counters. This is the bare minimum any +-- template will need. +local function extract_3_tuple(pkt, timestamp, entry, md, extract_addr_fn) ffi.fill(entry.key, ffi.sizeof(entry.key)) ffi.fill(entry.value, ffi.sizeof(entry.value)) - -- Fill key. - -- FIXME: Try using normal Lua assignment. - read_ipv4_src_address(l3, entry.key.sourceIPv4Address) - read_ipv4_dst_address(l3, entry.key.destinationIPv4Address) - local proto = get_ipv4_protocol(l3) - entry.key.protocolIdentifier = proto - if transport_proto_p[proto] then - extract_transport_key(entry, l4) - end - -- Fill value. + extract_addr_fn(md.l3, entry) + entry.key.protocolIdentifier = md.proto + entry.value.flowStartMilliseconds = timestamp entry.value.flowEndMilliseconds = timestamp entry.value.packetDeltaCount = 1 - entry.value.octetDeltaCount = get_ipv4_total_length(l3) - if proto == IP_PROTO_TCP then - extract_tcp_flags_reduced(entry, l4) + entry.value.octetDeltaCount = md.total_length +end + +local function extract_5_tuple(pkt, timestamp, entry, md, extract_addr_fn) + extract_3_tuple(pkt, timestamp, entry, md, extract_addr_fn) + if transport_proto_p[md.proto] and md.frag_offset == 0 then + extract_transport_key(md.l4, entry) end end -function v4.accumulate(dst, new) +local function accumulate_generic(dst, new) dst.value.flowEndMilliseconds = new.value.flowEndMilliseconds dst.value.packetDeltaCount = dst.value.packetDeltaCount + 1 dst.value.octetDeltaCount = dst.value.octetDeltaCount + new.value.octetDeltaCount +end + +v4 = make_template_info { + id = 256, + filter = "ip", + keys = { "sourceIPv4Address", + "destinationIPv4Address", + "protocolIdentifier", + "sourceTransportPort", + "destinationTransportPort" }, + values = { "flowStartMilliseconds", + "flowEndMilliseconds", + "packetDeltaCount", + "octetDeltaCount", + "tcpControlBitsReduced" } +} + +function v4.extract(pkt, timestamp, entry) + local md = metadata_get(pkt) + extract_5_tuple(pkt, timestamp, entry, md, extract_v4_addr) + if md.proto == IP_PROTO_TCP and md.frag_offset == 0 then + extract_tcp_flags_reduced(md.l4, entry) + end +end + +function v4.accumulate(dst, new) + accumulate_generic(dst, new) if dst.key.protocolIdentifier == IP_PROTO_TCP then accumulate_tcp_flags_reduced(dst, new) end @@ -318,39 +336,15 @@ v6 = make_template_info { } function v6.extract(pkt, timestamp, entry) - local l2 = pkt.data - local l3 = l2 + ethernet_header_size - -- TODO: handle chained headers - local l4 = l3 + ipv6_fixed_header_size - - ffi.fill(entry.key, ffi.sizeof(entry.key)) - ffi.fill(entry.value, ffi.sizeof(entry.value)) - -- Fill key. - -- FIXME: Try using normal Lua assignment. - read_ipv6_src_address(l3, entry.key.sourceIPv6Address) - read_ipv6_dst_address(l3, entry.key.destinationIPv6Address) - local proto = get_ipv6_next_header(l3) - entry.key.protocolIdentifier = proto - if transport_proto_p[proto] then - extract_transport_key(entry, l4) - end - - -- Fill value. - entry.value.flowStartMilliseconds = timestamp - entry.value.flowEndMilliseconds = timestamp - entry.value.packetDeltaCount = 1 - entry.value.octetDeltaCount = get_ipv6_payload_length(l3) - + ipv6_fixed_header_size - if proto == IP_PROTO_TCP then - extract_tcp_flags_reduced(entry, l4) + local md = metadata_get(pkt) + extract_5_tuple(pkt, timestamp, entry, md, extract_v6_addr) + if md.proto == IP_PROTO_TCP and md.frag_offset == 0 then + extract_tcp_flags_reduced(md.l4, entry) end end function v6.accumulate(dst, new) - dst.value.flowEndMilliseconds = new.value.flowEndMilliseconds - dst.value.packetDeltaCount = dst.value.packetDeltaCount + 1 - dst.value.octetDeltaCount = - dst.value.octetDeltaCount + new.value.octetDeltaCount + accumulate_generic(dst, new) if dst.key.protocolIdentifier == IP_PROTO_TCP then accumulate_tcp_flags_reduced(dst, new) end