diff --git a/src/apps/ipfix/ipfix-information-elements-local.inc b/src/apps/ipfix/ipfix-information-elements-local.inc index 33082a052e..8e3c2ea15c 100644 --- a/src/apps/ipfix/ipfix-information-elements-local.inc +++ b/src/apps/ipfix/ipfix-information-elements-local.inc @@ -13,3 +13,5 @@ this is the name of the first non-CNAME answer record or the name of the last CN 2946:110,dnsAnswerRdata,octetArray,"On-the-wire encoding of the answer record's rdata section. For well-known record types, compressed domain names have been replaced with their uncompressed counterparts",,,,,,,, 2946:111,dnsAnswerRdataLen,unsigned16,,,,,,,,, +39499:338,tlsSNI,string,DNS name from the TLS Server Name Indication extension,,,,,,,, +39499:339,tlsSNILength,unsigned16,Length of tlsSNI in bytes,,,,,,,, diff --git a/src/apps/ipfix/ipfix.lua b/src/apps/ipfix/ipfix.lua index 522f2cfa0e..dcc7b0a91c 100644 --- a/src/apps/ipfix/ipfix.lua +++ b/src/apps/ipfix/ipfix.lua @@ -120,6 +120,17 @@ end FlowSet = {} +local function create_maps(template, maps_in) + for _, name in ipairs(template.require_maps) do + assert(maps_in[name], + string.format("Template #%d: required map %s " + .."not configured", template.id, name)) + template.maps[name] = maps.mk_map(name, maps_in[name], + nil, template.maps_log_fh, + template.logger) + end +end + function FlowSet:new (spec, args) local t = {} for s in spec:split(':') do @@ -143,13 +154,9 @@ function FlowSet:new (spec, args) .." IPFIX template #"..template.id }) template.name = template_name template.maps = {} - for _, name in ipairs(template.require_maps) do - assert(args.maps[name], - string.format("Template #%d: required map %s " - .."not configured", template.id, name)) - template.maps[name] = maps.mk_map(name, args.maps[name], - nil, args.maps_log_fh) - end + template.maps_log_fh = args.maps_logfile and + assert(io.open(args.maps_logfile, "a")) or nil + create_maps(template, args.maps) assert(args.active_timeout > args.scan_time, string.format("Template #%d: active timeout (%d) " @@ -437,8 +444,8 @@ function FlowSet:suppress_flow(flow_entry, timestamp) local fps = aggr.flow_count/interval local drop_interval = (timestamp - aggr.tstamp_drop_start)/1000 if (fps >= config.threshold_rate) then - local aggr_ppf = aggr.packets/aggr.flow_count - local aggr_bpp = aggr.octets/aggr.packets + local aggr_ppf = aggr.packets/aggr.flow_count + local aggr_bpp = aggr.octets/aggr.packets if aggr.suppress == 0 then self.template.logger:log( string.format("Flow rate threshold exceeded from %s: ".. @@ -481,9 +488,9 @@ function FlowSet:suppress_flow(flow_entry, timestamp) flow_entry.value.octetDeltaCount end if config.drop and aggr.suppress == 1 then - -- NB: this rate-limiter applies to flows from *all* - -- aggregates, while the threshold rate applies to each - -- aggregate individually. + -- NB: this rate-limiter applies to flows from *all* + -- aggregates, while the threshold rate applies to each + -- aggregate individually. if self.sp.export_rate_tb:take(1) then aggr.exports = aggr.exports + 1 return false @@ -712,16 +719,27 @@ function IPFIX:reconfig(config) flush_timeout = config.flush_timeout, parent = self, maps = config.maps, - maps_log_fh = config.maps_logfile and - assert(io.open(config.maps_logfile, "a")) or nil, + maps_logfile = config.maps_logfile, instance = config.instance, log_date = config.log_date } + -- Eventually, we'd like to perform reconfiguration with as little + -- impact as possible. In particular, we want to avoid + -- re-allocation of the flow table unnecessarily. For now, we only + -- deal with the case when any of the mapping files changes since + -- this is fairly common and is easy to do on-the-fly. local flow_set_args_changed = not lib.equal(self.flow_set_args, flow_set_args) + local flow_set_args_changed_basic = flow_set_args_changed + if self.flow_set_args and flow_set_args_changed then + local save = flow_set_args.maps + flow_set_args.maps = self.flow_set_args.maps + flow_set_args_changed_basic = not lib.equal(self.flow_set_args, flow_set_args) + flow_set_args.maps = save + end self.flow_set_args = flow_set_args for i, template in ipairs(self.templates) do - if template ~= config.templates[i] or flow_set_args_changed then + if template ~= config.templates[i] or flow_set_args_changed_basic then self.flow_sets[i] = nil end end @@ -733,6 +751,9 @@ function IPFIX:reconfig(config) else self.logger:log("Added template "..self.flow_sets[i]:id()) end + elseif flow_set_args_changed then + create_maps(self.flow_sets[i].template, config.maps) + self.logger:log("Updated maps for template "..self.flow_sets[i]:id()) else self.logger:log("Kept template "..self.flow_sets[i]:id()) end diff --git a/src/apps/ipfix/maps.lua b/src/apps/ipfix/maps.lua index 50c17731a2..1b5116ec67 100644 --- a/src/apps/ipfix/maps.lua +++ b/src/apps/ipfix/maps.lua @@ -8,6 +8,7 @@ local ipv4 = require("lib.protocol.ipv4") local ipv6 = require("lib.protocol.ipv6") local poptrie = require("lib.poptrie") local logger = require("lib.logger") +local S = require("syscall") -- Map MAC addresses to peer AS number -- @@ -17,7 +18,7 @@ local logger = require("lib.logger") local mac_to_as_key_t = ffi.typeof("uint8_t[6]") local mac_to_as_value_t = ffi.typeof("uint32_t") -local function make_mac_to_as_map(name) +local function make_mac_to_as_map(name, template_logger) local table = ctable.new({ key_type = mac_to_as_key_t, value_type = mac_to_as_value_t, initial_size = 15000, @@ -26,16 +27,19 @@ local function make_mac_to_as_map(name) local value = mac_to_as_value_t() for line in assert(io.lines(name)) do local as, mac = line:match("^%s*(%d*)-([0-9a-fA-F:]*)") - assert(as and mac, "MAC-to-AS map: invalid line: "..line) - local key, value = ethernet:pton(mac), tonumber(as) - local result = table:lookup_ptr(key) - if result then - if result.value ~= value then - print("MAC-to-AS map: amibguous mapping: " - ..ethernet:ntop(key)..": "..result.value..", "..value) - end + if not (as and mac) then + template_logger:log("MAC-to-AS map: invalid line: "..line) + else + local key, value = ethernet:pton(mac), tonumber(as) + local result = table:lookup_ptr(key) + if result then + if result.value ~= value then + template_logger:log("MAC-to-AS map: amibguous mapping: " + ..ethernet:ntop(key)..": "..result.value..", "..value) + end + end + table:add(key, value, true) end - table:add(key, value, true) end return table end @@ -52,16 +56,18 @@ end -- elements is relevant, depending on the direction of the flow. File -- format: -- -- -local function make_vlan_to_ifindex_map(name) +local function make_vlan_to_ifindex_map(name, template_logger) local table = {} for line in assert(io.lines(name)) do local vlan, ingress, egress = line:match("^(%d+)-(%d+)-(%d+)$") - assert(vlan and ingress and egress, - "VLAN-to-IFIndex map: invalid line: "..line) - table[tonumber(vlan)] = { - ingress = tonumber(ingress), - egress = tonumber(egress) - } + if not (vlan and ingress and egress) then + template_logger:log("VLAN-to-IFIndex map: invalid line: "..line) + else + table[tonumber(vlan)] = { + ingress = tonumber(ingress), + egress = tonumber(egress) + } + end end return table end @@ -74,17 +80,20 @@ end -- authoritative data from the RIRs. This parser supports the format -- used by the Geo2Lite database provided by MaxMind: -- http://geolite.maxmind.com/download/geoip/database/GeoLite2-ASN-CSV.zip -local function make_pfx_to_as_map(name, proto) +local function make_pfx_to_as_map(name, proto, template_logger) local table = { pt = poptrie.new{direct_pointing=true, leaf_t=ffi.typeof("uint32_t")} } + local max_plen if proto == ipv4 then function table:search_bytes (a) return self.pt:lookup32(a) end + max_plen = 32 elseif proto == ipv6 then function table:search_bytes (a) return self.pt:lookup128(a) end + max_plen = 128 else error("Proto must be ipv4 or ipv6") end @@ -92,10 +101,19 @@ local function make_pfx_to_as_map(name, proto) if not line:match("^network") then local cidr, asn = line:match("([^,]*),(%d+),") asn = tonumber(asn) - assert(cidr and asn, "Prefix-to-AS map: invalid line: "..line) - assert(asn > 0 and asn < 2^32, "Prefix-to-AS map: asn out of range: "..asn) - local pfx, len = proto:pton_cidr(cidr) - table.pt:add(pfx, len, asn) + if not (cidr and asn) then + print(cidr, asn) + template_logger:log("Prefix-to-AS map: invalid line: "..line) + elseif not (asn > 0 and asn < 2^32) then + template_logger:log("Prefix-to-AS map: asn out of range: "..line) + else + local pfx, len = proto:pton_cidr(cidr) + if pfx and len <= max_plen then + table.pt:add(pfx, len, asn) + else + template_logger:log("Prefix-to-AS map: invalid address: "..line) + end + end end end table.pt:build() @@ -112,25 +130,36 @@ local map_info = { logger_module = 'VLAN to ifIndex mapper' }, pfx4_to_as = { - create_fn = function (name) return make_pfx_to_as_map(name, ipv4) end, + create_fn = function (name, tmpl_logger) + return make_pfx_to_as_map(name, ipv4, tmpl_logger) + end, logger_module = 'IPv4 prefix to AS mapper' }, pfx6_to_as = { - create_fn = function (name) return make_pfx_to_as_map(name, ipv6) end, + create_fn = function (name, tmpl_logger) + return make_pfx_to_as_map(name, ipv6, tmpl_logger) + end, logger_module = 'IPv6 prefix to AS mapper' } } local maps = {} -function mk_map(name, file, log_rate, log_fh) +function mk_map(name, file, log_rate, log_fh, template_logger) local info = assert(map_info[name]) - local map = maps[name] - if not map then - map = info.create_fn(file) - maps[name] = map + local stat = assert(S.stat(file)) + local map_cache = maps[name] + if not map_cache or map_cache.ctime ~= stat.ctime then + map_cache = { + map = info.create_fn(file, template_logger), + ctime = stat.ctime + } + maps[name] = map_cache + template_logger:log("Created "..name.." map from "..file) + else + template_logger:log("Using cache for map "..name) end - local map = { map = map } + local map = { map = map_cache.map } if log_fh then map.logger = logger.new({ rate = log_rate or 0.05, fh = log_fh, diff --git a/src/apps/ipfix/template.lua b/src/apps/ipfix/template.lua index bd37534d6b..0bb34b6225 100644 --- a/src/apps/ipfix/template.lua +++ b/src/apps/ipfix/template.lua @@ -15,6 +15,7 @@ local ipv6 = require("lib.protocol.ipv6") local metadata = require("apps.rss.metadata") local strings = require("apps.ipfix.strings") local dns = require("apps.ipfix.dns") +local tls = require("apps.ipfix.tls") local S = require("syscall") local ntohs = lib.ntohs @@ -498,6 +499,21 @@ local function HTTP_accumulate(self, dst, new, pkt) end end +-- HTTPS-specific statistics counters +local function HTTPS_counters() + return { + HTTPS_client_hellos = 0, + HTTPS_extensions_present = 0, + HTTPS_snis = 0, + } +end + +local function HTTPS_accumulate(self, dst, new, pkt) + accumulate_generic(dst, new) + accumulate_tcp_flags_reduced(dst, new) + tls.accumulate(self, dst.value, pkt) +end + local function DNS_extract(self, pkt, timestamp, entry, extract_addr_fn) local md = metadata_get(pkt) extract_5_tuple(pkt, timestamp, entry, md, extract_addr_fn) @@ -720,6 +736,26 @@ templates = { end, accumulate = DNS_accumulate }, + v4_HTTPS = { + id = 259, + filter = "ip and tcp and (dst port 443 or dst port 8443)", + aggregation_type = 'v4', + keys = { "sourceIPv4Address", + "destinationIPv4Address", + "protocolIdentifier", + "sourceTransportPort", + "destinationTransportPort" }, + values = { "flowStartMilliseconds", + "flowEndMilliseconds", + "packetDeltaCount", + "octetDeltaCount", + "tcpControlBitsReduced", + "tlsSNI=64", + "tlsSNILength"}, + counters = HTTPS_counters(), + extract = v4_extract, + accumulate = HTTPS_accumulate, + }, v4_extended = { id = 1256, filter = "ip", @@ -839,6 +875,26 @@ templates = { end, accumulate = DNS_accumulate }, + v6_HTTPS = { + id = 515, + filter = "ip6 and tcp and (dst port 443 or dst port 8443)", + aggregation_type = 'v6', + keys = { "sourceIPv6Address", + "destinationIPv6Address", + "protocolIdentifier", + "sourceTransportPort", + "destinationTransportPort" }, + values = { "flowStartMilliseconds", + "flowEndMilliseconds", + "packetDeltaCount", + "octetDeltaCount", + "tcpControlBitsReduced", + "tlsSNI=64", + "tlsSNILength"}, + counters = HTTPS_counters(), + extract = v6_extract, + accumulate = HTTPS_accumulate, + }, v6_extended = { id = 1512, filter = "ip6", diff --git a/src/apps/ipfix/tls.lua b/src/apps/ipfix/tls.lua new file mode 100644 index 0000000000..00c71bde40 --- /dev/null +++ b/src/apps/ipfix/tls.lua @@ -0,0 +1,177 @@ +module(..., package.seeall) + +local ffi = require("ffi") +local lib = require("core.lib") +local metadata = require("apps.rss.metadata") + +local metadata_get = metadata.get +local ntohs = lib.ntohs + +local types = { + record_t = ffi.typeof([[ + struct { + uint8_t type; + uint16_t version; + uint16_t length; + uint8_t data[0]; + } __attribute__((packed)) + ]]), + handshake_t = ffi.typeof([[ + struct { + uint8_t msg_type; + uint8_t length_msb; + uint16_t length; + uint8_t data[0]; + } __attribute__((packed)) + ]]), + client_hello_t = ffi.typeof([[ + struct { + uint16_t version; + uint8_t random[32]; + uint8_t data[0]; + } __attribute__((packed)) + ]]), + lv1_t = ffi.typeof([[ + struct { + uint8_t length; + uint8_t data[0]; + } __attribute__((packed)) + ]]), + lv2_t = ffi.typeof([[ + struct { + uint16_t length; + uint8_t data[0]; + } __attribute__((packed)) + ]]), + extensions_t = ffi.typeof([[ + struct { + uint16_t length; + uint8_t data[0]; + } __attribute__((packed)) + ]]), + tlv_t = ffi.typeof([[ + struct { + uint16_t type; + uint16_t length; + uint8_t value[0]; + } __attribute__((packed)) + ]]), + sni_t = ffi.typeof([[ + struct { + uint16_t list_length; + uint8_t name_type; + uint16_t name_length; + uint8_t name[0]; + } __attribute__((packed)) + ]]) +} +local ptrs = {} +for n, t in pairs(types) do + ptrs[n] = ffi.typeof("$*", t) +end + +local function skip_lv1(data) + local tlv = ffi.cast(ptrs.lv1_t, data) + return tlv.data + tlv.length +end + +local function skip_lv2(data) + local tlv = ffi.cast(ptrs.lv2_t, data) + return tlv.data + ntohs(tlv.length) +end + +local function tcp_header_size(l4) + local offset = bit.rshift(ffi.cast("uint8_t*", l4)[12], 4) + return offset * 4 +end + +local function out_of_bounds (eop, ptr, size) + return ffi.cast("uint8_t *", ptr) + size > eop +end + +function accumulate(self, entry, pkt) + local md = metadata_get(pkt) + -- The TLS handshake starts right after the TCP handshake, + -- i.e. either in the second (piggy-backed on the handshake ACK) or + -- third packet of the flow. + local payload = md.l4 + tcp_header_size(md.l4) + -- The effective payload size is the amount of the payload that is + -- actually present. This can be smaller than the actual payload + -- size if the packet has been truncated, e.g. by a port-mirror. It + -- can also be larger if the packet has been padded to the minimum + -- frame size (64 bytes). This can be safely ignored. + local eff_payload_size = pkt.length - md.l3_offset - (payload - md.l3) + if ((entry.packetDeltaCount == 1 or -- SYN + (entry.packetDeltaCount == 2 and eff_payload_size == 0) or -- Empty ACK + entry.packetDeltaCount > 3)) then + return + end + -- End Of Payload (first byte after the effective payload), used + -- for bounds check + local eop = payload + eff_payload_size + + -- Check bounds for the fixed-size part of the message + if out_of_bounds(eop, payload, + ffi.sizeof(types.record_t) + + ffi.sizeof(types.handshake_t) + + ffi.sizeof(types.client_hello_t) + + -- Header of session ID TLV + ffi.sizeof(types.lv1_t)) then + return + end + + local record = ffi.cast(ptrs.record_t, payload) + -- Handshake record? + if record.type ~= 22 then return end + local handshake = ffi.cast(ptrs.handshake_t, record.data) + + -- Client Hello? + if handshake.msg_type ~= 1 then return end + local client_hello = ffi.cast(ptrs.client_hello_t, handshake.data) + -- Extensions are only supported since TLS 1.2 + if ntohs(client_hello.version) < 0x0303 then return end + self.counters.HTTPS_client_hellos = self.counters.HTTPS_client_hellos + 1 + + -- End Of Client Hello, used to check for the presence of extensions + local eoh = ffi.cast("uint8_t *", client_hello) + ntohs(handshake.length) + 65536 * handshake.length_msb + + -- Skip session ID + local tmp = skip_lv1(client_hello.data) + if out_of_bounds(eop, tmp, ffi.sizeof(types.lv2_t)) then return end + + -- Skip cipher suits + tmp = skip_lv2(tmp) + if out_of_bounds(eop, tmp, ffi.sizeof(types.lv1_t)) then return end + + -- Skip compress methods + local extensions = ffi.cast(ptrs.extensions_t, skip_lv1(tmp)) + -- Extensions present? + if ffi.cast("uint8_t *", extensions) >= eoh then return end + self.counters.HTTPS_extensions_present = self.counters.HTTPS_extensions_present + 1 + + local extensions_length = ntohs(extensions.length) + -- Find the SNI extension + local extension = extensions.data + while (extensions_length > 0) do + if out_of_bounds(eop, extension, ffi.sizeof(types.tlv_t)) then return end + local tlv = ffi.cast(ptrs.tlv_t, extension) + if ntohs(tlv.type) == 0 then + -- SNI, list of server names (RFC6066), extract the entry of + -- type 0 (DNS hostname). This is the only type currently + -- defined so must be the first and only entry in the + -- list. To be future-proof, we should really skip names of + -- different types. + local sni = ffi.cast(ptrs.sni_t, tlv.value) + if sni.name_type ~= 0 then return end + local name_length = ntohs(sni.name_length) + if out_of_bounds(eop, sni.name, name_length) then return end + ffi.copy(entry.tlsSNI, sni.name, math.min(ffi.sizeof(entry.tlsSNI), name_length)) + entry.tlsSNILength = name_length + self.counters.HTTPS_snis = self.counters.HTTPS_snis + 1 + return + end + local length = ntohs(tlv.length) + extensions_length = extensions_length - length - ffi.sizeof(types.tlv_t) + extension = tlv.value + length + end +end diff --git a/src/apps/ipv4/reassemble.lua b/src/apps/ipv4/reassemble.lua index 7e48ebac59..f42e8f1112 100644 --- a/src/apps/ipv4/reassemble.lua +++ b/src/apps/ipv4/reassemble.lua @@ -266,7 +266,7 @@ function Reassembler:handle_fragment(h, fragment) -- Prevent a buffer overflow. The relevant RFC allows hosts to -- silently discard reassemblies above a certain rather small -- size, smaller than this. - return self:reassembly_error() + return self:reassembly_error(entry) end ffi.copy(reassembly.packet.data + dst_offset, fragment.data + skip_headers, frag_size) @@ -349,9 +349,10 @@ function selftest() -- Returns a new packet containing an Ethernet frame with an IPv4 -- header followed by PAYLOAD_SIZE random bytes. - local function make_test_packet(payload_size) - local pkt = packet.from_pointer(lib.random_bytes(payload_size), - payload_size) + local function make_test_packet(payload_size, pkt) + pkt = pkt or packet.allocate() + ffi.copy(pkt.data, lib.random_bytes(payload_size), payload_size) + pkt.length = payload_size local eth_h = ether:new({ src = random_mac(), dst = random_mac(), type = ethertype_ipv4 }) local ip_h = ipv4:new({ src = random_ipv4(), dst = random_ipv4(), @@ -370,7 +371,7 @@ function selftest() fragment.shm = shm.create_frame("apps/fragmenter", fragment.shm) fragment.input = { input = link.new('fragment input') } fragment.output = { output = link.new('fragment output') } - link.transmit(fragment.input.input, packet.clone(pkt)) + link.transmit(fragment.input.input, pkt) fragment:push() local ret = {} while not link.empty(fragment.output.output) do @@ -398,7 +399,7 @@ function selftest() for _, size in ipairs({100, 400, 1000, 1500, 2000}) do local pkt = make_test_packet(size) for _, mtu in ipairs({512, 1000, 1500}) do - local fragments = fragment(pkt, mtu) + local fragments = fragment(packet.clone(pkt), mtu) for _, order in ipairs(permute_indices(1, #fragments)) do local reassembler = Reassembler:new { max_concurrent_reassemblies = 100, @@ -436,5 +437,99 @@ function selftest() packet.free(pkt) end + -- test reassembly errors + + -- too many fragments + local pkt = make_test_packet(9000) + local fragments = fragment(pkt, 128) + local reassembler = Reassembler:new { + max_concurrent_reassemblies = 100, + max_fragments_per_reassembly = 20 + } + reassembler.shm = shm.create_frame( + "apps/reassembler", reassembler.shm) + reassembler.input = { input = link.new('reassembly input') } + reassembler.output = { output = link.new('reassembly output') } + for _, f in ipairs(fragments) do + link.transmit(reassembler.input.input, f) + end + reassembler:push() + assert(link.empty(reassembler.output.output)) + assert(counter.read(reassembler.shm["drop-ipv4-frag-invalid-reassembly"]) == 4) + shm.delete_frame(reassembler.shm) + -- more than one final fragment + local pkt = make_test_packet(1500) + local fragments = fragment(pkt, 512) + local reassembler = Reassembler:new { + max_concurrent_reassemblies = 100, + max_fragments_per_reassembly = 20 + } + reassembler.shm = shm.create_frame( + "apps/reassembler", reassembler.shm) + reassembler.input = { input = link.new('reassembly input') } + reassembler.output = { output = link.new('reassembly output') } + link.transmit(reassembler.input.input, packet.clone(fragments[#fragments])) + link.transmit(reassembler.input.input, packet.clone(fragments[#fragments])) + reassembler:push() + assert(link.empty(reassembler.output.output)) + assert(counter.read(reassembler.shm["drop-ipv4-frag-invalid-reassembly"]) == 1) + shm.delete_frame(reassembler.shm) + -- reassembly buffer overflow + local buffer = ffi.new("uint8_t[?]", 50000) + local p = ffi.cast( + "struct packet *", + lib.align(ffi.cast("uintptr_t", buffer), packet.packet_alignment) + + packet.default_headroom + ) + local pkt = make_test_packet(15000, p) + local fragments = fragment(pkt, 1500) + local reassembler = Reassembler:new { + max_concurrent_reassemblies = 100, + max_fragments_per_reassembly = 20 + } + reassembler.shm = shm.create_frame( + "apps/reassembler", reassembler.shm) + reassembler.input = { input = link.new('reassembly input') } + reassembler.output = { output = link.new('reassembly output') } + for _, f in ipairs(fragments) do + link.transmit(reassembler.input.input, f) + end + reassembler:push() + assert(link.empty(reassembler.output.output)) + assert(counter.read(reassembler.shm["drop-ipv4-frag-invalid-reassembly"]) == 5) + shm.delete_frame(reassembler.shm) + -- invalid offsets + local pkt = make_test_packet(1500) + local fragments = fragment(pkt, 768) + local reassembler = Reassembler:new { + max_concurrent_reassemblies = 100, + max_fragments_per_reassembly = 20 + } + reassembler.shm = shm.create_frame( + "apps/reassembler", reassembler.shm) + reassembler.input = { input = link.new('reassembly input') } + reassembler.output = { output = link.new('reassembly output') } + link.transmit(reassembler.input.input, packet.clone(fragments[1])) + link.transmit(reassembler.input.input, packet.clone(fragments[2])) + reassembler:push() + local final = fragments[3] + local h = ffi.cast(ether_ipv4_header_ptr_t, final.data) + local reassembly = reassembler:lookup_reassembly(h, final).value + reassembly.fragment_starts[0] = 1 + link.transmit(reassembler.input.input, packet.clone(final)) + reassembler:push() + link.transmit(reassembler.input.input, packet.clone(fragments[1])) + link.transmit(reassembler.input.input, packet.clone(fragments[2])) + reassembler:push() + local final = fragments[3] + local h = ffi.cast(ether_ipv4_header_ptr_t, final.data) + local reassembly = reassembler:lookup_reassembly(h, final).value + reassembly.fragment_starts[0] = 0 + reassembly.fragment_ends[0] = 10 + link.transmit(reassembler.input.input, packet.clone(final)) + reassembler:push() + assert(link.empty(reassembler.output.output)) + assert(counter.read(reassembler.shm["drop-ipv4-frag-invalid-reassembly"]) == 2) + shm.delete_frame(reassembler.shm) print("selftest: ok") end diff --git a/src/apps/rss/README.md b/src/apps/rss/README.md index ed488e1770..931b4adf1f 100644 --- a/src/apps/rss/README.md +++ b/src/apps/rss/README.md @@ -116,9 +116,9 @@ and link #2 will get 2/3 of the traffic. In order to compute the hash over the header fields, the `rss` app must parse the packets to a certain extent. Internally, the result of -this analysis is appended as a block of data to the end of the actual -packet data. Because this data can be useful to other apps downstream -of the `rss` app, it is exposed as part of the API. +this analysis is prepended as a block of data to the start of the +actual packet data. Because this data can be useful to other apps +downstream of the `rss` app, it is exposed as part of the API. The meta-data is structured as follows @@ -378,7 +378,12 @@ real VLAN ID or not). ## Meta-data API The meta-data functionality is provided by the module -`apps.rss.metadata` and provides the following API. +`apps.rss.metadata` and provides the following API. The metadata is +stored in the area of the packet buffer that is reserved as headroom +for prepending headers to the packet. Consequently, using any of the +functions that add or remove headers (`append`, `prepend`, +`shiftleft`, `shiftright` from `core.packet`) will invalidate the +metadata. — Function **add** *packet*, *remove_extension_headers*, *vlan* diff --git a/src/apps/rss/metadata.lua b/src/apps/rss/metadata.lua index 22ecec54cd..658355c253 100644 --- a/src/apps/rss/metadata.lua +++ b/src/apps/rss/metadata.lua @@ -229,8 +229,10 @@ pkt_meta_data_t = ffi.typeof([[ pkt_meta_data_ptr_t = ptr_to(pkt_meta_data_t) local function md_ptr (pkt) - assert(ffi.C.PACKET_PAYLOAD_SIZE - pkt.length >= ffi.sizeof(pkt_meta_data_t)) - return ffi.cast(pkt_meta_data_ptr_t, pkt.data + pkt.length) + local headroom = bit.band(ffi.cast("uint64_t", pkt), packet.packet_alignment - 1) + local md_len = ffi.sizeof(pkt_meta_data_t) + assert(headroom >= md_len) + return ffi.cast(pkt_meta_data_ptr_t, ffi.cast("uint8_t*", pkt) - md_len) end local function set_pointers (md, pkt) diff --git a/src/apps/rss/rss.lua b/src/apps/rss/rss.lua index 225bf32a74..db626d6a79 100644 --- a/src/apps/rss/rss.lua +++ b/src/apps/rss/rss.lua @@ -177,7 +177,7 @@ function rss:new (config) end end else - pf_fn = pf.compile_filter(config.filter) + local pf_fn = pf.compile_filter(config.filter) match_fn = function(md) return pf_fn(md.filter_start, md.filter_length) end diff --git a/src/lib/yang/snabb-snabbflow-v1.yang b/src/lib/yang/snabb-snabbflow-v1.yang index 87e75d0949..6f2696783d 100644 --- a/src/lib/yang/snabb-snabbflow-v1.yang +++ b/src/lib/yang/snabb-snabbflow-v1.yang @@ -132,6 +132,15 @@ module snabb-snabbflow-v1 { executed in one or more instances, each in its own dedicated worker process (using a dedicated CPU core)."; } + + leaf acquire-cpu { + type boolean; + default true; + description + "When 'embed' is false, select whether the worker process should + be pinned to a CPU or not. Has no effect when 'embed' is set + to true."; + } leaf instances { type uint32 { range 1..max; } diff --git a/src/program/ipfix/probe/probe.lua b/src/program/ipfix/probe/probe.lua index e29911a8e2..518ca51923 100644 --- a/src/program/ipfix/probe/probe.lua +++ b/src/program/ipfix/probe/probe.lua @@ -326,7 +326,8 @@ function setup_workers (config) -- Dedicated exporter processes are restartable worker_opts[rss_link] = { restart_intensity = software_scaling.restart.intensity, - restart_period = software_scaling.restart.period + restart_period = software_scaling.restart.period, + acquire_cpu = software_scaling.acquire_cpu } end table.insert(outputs, output) @@ -405,4 +406,4 @@ function setup_workers (config) end return workers, worker_opts -end \ No newline at end of file +end