diff --git a/scapy/all.py b/scapy/all.py index b78326caf8f..f9279eafc16 100644 --- a/scapy/all.py +++ b/scapy/all.py @@ -26,6 +26,7 @@ from scapy.utils import * from scapy.route import * from scapy.sendrecv import * +from scapy.sessions import * from scapy.supersocket import * from scapy.volatile import * from scapy.as_resolvers import * diff --git a/scapy/layers/inet.py b/scapy/layers/inet.py index bcccce7fcd6..bba28828d71 100644 --- a/scapy/layers/inet.py +++ b/scapy/layers/inet.py @@ -1003,6 +1003,46 @@ def overlap_frag(p, overlap, fragsize=8, overlap_fragsize=None): return qfrag + fragment(p, fragsize) +def _defrag_list(lst, defrag, missfrag): + """Internal usage only. Part of the _defrag_logic""" + p = lst[0] + lastp = lst[-1] + if p.frag > 0 or lastp.flags & 1 != 0: # first or last fragment missing # noqa: E501 + missfrag.append(lst) + return + p = p.copy() + if conf.padding_layer in p: + del(p[conf.padding_layer].underlayer.payload) + ip = p[IP] + if ip.len is None or ip.ihl is None: + clen = len(ip.payload) + else: + clen = ip.len - (ip.ihl << 2) + txt = conf.raw_layer() + for q in lst[1:]: + if clen != q.frag << 3: # Wrong fragmentation offset + if clen > q.frag << 3: + warning("Fragment overlap (%i > %i) %r || %r || %r" % (clen, q.frag << 3, p, txt, q)) # noqa: E501 + missfrag.append(lst) + break + if q[IP].len is None or q[IP].ihl is None: + clen += len(q[IP].payload) + else: + clen += q[IP].len - (q[IP].ihl << 2) + if conf.padding_layer in q: + del(q[conf.padding_layer].underlayer.payload) + txt.add_payload(q[IP].payload.copy()) + if q.time > p.time: + p.time = q.time + else: + ip.flags &= ~1 # !MF + del(ip.chksum) + del(ip.len) + p = p / txt + p._defrag_pos = max(x._defrag_pos for x in lst) + defrag.append(p) + + def _defrag_logic(plist, complete=False): """Internal function used to defragment a list of packets. It contains the logic behind the defrag() and defragment() functions @@ -1015,7 +1055,7 @@ def _defrag_logic(plist, complete=False): pos += 1 if IP in p: ip = p[IP] - if ip.frag != 0 or ip.flags & 1: + if ip.frag != 0 or ip.flags.MF: uniq = (ip.id, ip.src, ip.dst, ip.proto) frags[uniq].append(p) continue @@ -1025,42 +1065,7 @@ def _defrag_logic(plist, complete=False): missfrag = [] for lst in six.itervalues(frags): lst.sort(key=lambda x: x.frag) - p = lst[0] - lastp = lst[-1] - if p.frag > 0 or lastp.flags & 1 != 0: # first or last fragment missing # noqa: E501 - missfrag.append(lst) - continue - p = p.copy() - if conf.padding_layer in p: - del(p[conf.padding_layer].underlayer.payload) - ip = p[IP] - if ip.len is None or ip.ihl is None: - clen = len(ip.payload) - else: - clen = ip.len - (ip.ihl << 2) - txt = conf.raw_layer() - for q in lst[1:]: - if clen != q.frag << 3: # Wrong fragmentation offset - if clen > q.frag << 3: - warning("Fragment overlap (%i > %i) %r || %r || %r" % (clen, q.frag << 3, p, txt, q)) # noqa: E501 - missfrag.append(lst) - break - if q[IP].len is None or q[IP].ihl is None: - clen += len(q[IP].payload) - else: - clen += q[IP].len - (q[IP].ihl << 2) - if conf.padding_layer in q: - del(q[conf.padding_layer].underlayer.payload) - txt.add_payload(q[IP].payload.copy()) - if q.time > p.time: - p.time = q.time - else: - ip.flags &= ~1 # !MF - del(ip.chksum) - del(ip.len) - p = p / txt - p._defrag_pos = max(x._defrag_pos for x in lst) - defrag.append(p) + _defrag_list(lst, defrag, missfrag) defrag2 = [] for p in defrag: q = p.__class__(raw(p)) diff --git a/scapy/layers/netflow.py b/scapy/layers/netflow.py index b071894eab1..37f2a0d9c3e 100644 --- a/scapy/layers/netflow.py +++ b/scapy/layers/netflow.py @@ -8,10 +8,14 @@ """ Cisco NetFlow protocol v1, v5 and v9 -HowTo debug NetflowV9 packets: +HowTo dissect NetflowV9 packets: + +# From a pcap - get a list of packets containing NetflowV9 packets -- call netflowv9_defragment(plist) to defragment the list -Caution: this API might be updated +- call `netflowv9_defragment(plist)` to defragment the list + +# Live / on-the-flow: +>>> sniff(session=NetflowSession, prn=[...]) """ import struct @@ -25,6 +29,7 @@ StrField, StrFixedLenField, ThreeBytesField, UTCTimeField, XByteField, \ XShortField from scapy.packet import Packet, bind_layers, bind_bottom_up +from scapy.sessions import IPSession, DefaultSession from scapy.layers.inet import UDP from scapy.layers.inet6 import IP6Field @@ -35,8 +40,10 @@ class NetflowHeader(Packet): fields_desc = [ShortField("version", 1)] -bind_bottom_up(UDP, NetflowHeader, dport=2055) -bind_bottom_up(UDP, NetflowHeader, sport=2055) +for port in [2055, 2056, 9995, 9996, 6343]: # Classic NetFlow ports + bind_bottom_up(UDP, NetflowHeader, dport=port) + bind_bottom_up(UDP, NetflowHeader, sport=port) +# However, we'll default to 2055, classic among classics :) bind_layers(UDP, NetflowHeader, dport=2055, sport=2055) ########################################### @@ -431,6 +438,104 @@ def dispatch_hook(cls, _pkt=None, *args, **kargs): return cls +def _netflowv9_defragment_packet(pkt, definitions, definitions_opts, ignored): + """Used internally to process a single packet during defragmenting""" + # Dataflowset definitions + if NetflowFlowsetV9 in pkt: + current = pkt + while NetflowFlowsetV9 in current: + current = current[NetflowFlowsetV9] + for ntv9 in current.templates: + llist = [] + for tmpl in ntv9.template_fields: + llist.append((tmpl.fieldLength, tmpl.fieldType)) + if llist: + tot_len = sum(x[0] for x in llist) + cls = _GenNetflowRecordV9(NetflowRecordV9, llist) + definitions[ntv9.templateID] = (tot_len, cls) + current = current.payload + # Options definitions + if NetflowOptionsFlowsetV9 in pkt: + current = pkt + while NetflowOptionsFlowsetV9 in current: + current = current[NetflowOptionsFlowsetV9] + # Load scopes + llist = [] + for scope in current.scopes: + llist.append(( + scope.scopeFieldlength, + scope.scopeFieldType + )) + scope_tot_len = sum(x[0] for x in llist) + scope_cls = _GenNetflowRecordV9( + NetflowOptionsRecordScopeV9, + llist + ) + # Load options + llist = [] + for opt in current.options: + llist.append(( + opt.optionFieldlength, + opt.optionFieldType + )) + option_tot_len = sum(x[0] for x in llist) + option_cls = _GenNetflowRecordV9( + NetflowOptionsRecordOptionV9, + llist + ) + # Storage + definitions_opts[current.templateID] = ( + scope_tot_len, scope_cls, + option_tot_len, option_cls + ) + current = current.payload + # Dissect flowsets + if NetflowDataflowsetV9 in pkt: + datafl = pkt[NetflowDataflowsetV9] + tid = datafl.templateID + if tid not in definitions and tid not in definitions_opts: + ignored.add(tid) + return + # All data is stored in one record, awaiting to be split + # If fieldValue is available, the record has not been + # defragmented: pop it + try: + data = datafl.records[0].fieldValue + datafl.records.pop(0) + except (IndexError, AttributeError): + return + res = [] + # Flowset record + # Now, according to the flow/option data, + # let's re-dissect NetflowDataflowsetV9 + if tid in definitions: + tot_len, cls = definitions[tid] + while len(data) >= tot_len: + res.append(cls(data[:tot_len])) + data = data[tot_len:] + # Inject dissected data + datafl.records = res + datafl.do_dissect_payload(data) + # Options + elif tid in definitions_opts: + (scope_len, scope_cls, + option_len, option_cls) = definitions_opts[tid] + # Dissect scopes + if scope_len: + res.append(scope_cls(data[:scope_len])) + if option_len: + res.append( + option_cls(data[scope_len:scope_len + option_len]) + ) + if len(data) > scope_len + option_len: + res.append( + conf.padding_layer(data[scope_len + option_len:]) + ) + # Inject dissected data + datafl.records = res + datafl.name = "Netflow DataFlowSet V9 - OPTIONS" + + def netflowv9_defragment(plist, verb=1): """Process all NetflowV9 Packets to match IDs of the DataFlowsets with the Headers @@ -444,106 +549,38 @@ def netflowv9_defragment(plist, verb=1): definitions_opts = {} ignored = set() # Iterate through initial list - for pkt in plist: # NetflowDataflowsetV9: - # Dataflowset definitions - if NetflowFlowsetV9 in pkt: - current = pkt - while NetflowFlowsetV9 in current: - current = current[NetflowFlowsetV9] - for ntv9 in current.templates: - llist = [] - for tmpl in ntv9.template_fields: - llist.append((tmpl.fieldLength, tmpl.fieldType)) - if llist: - tot_len = sum(x[0] for x in llist) - cls = _GenNetflowRecordV9(NetflowRecordV9, llist) - definitions[ntv9.templateID] = (tot_len, cls) - current = current.payload - # Options definitions - if NetflowOptionsFlowsetV9 in pkt: - current = pkt - while NetflowOptionsFlowsetV9 in current: - current = current[NetflowOptionsFlowsetV9] - # Load scopes - llist = [] - for scope in current.scopes: - llist.append(( - scope.scopeFieldlength, - scope.scopeFieldType - )) - scope_tot_len = sum(x[0] for x in llist) - scope_cls = _GenNetflowRecordV9( - NetflowOptionsRecordScopeV9, - llist - ) - # Load options - llist = [] - for opt in current.options: - llist.append(( - opt.optionFieldlength, - opt.optionFieldType - )) - option_tot_len = sum(x[0] for x in llist) - option_cls = _GenNetflowRecordV9( - NetflowOptionsRecordOptionV9, - llist - ) - # Storage - definitions_opts[current.templateID] = ( - scope_tot_len, scope_cls, - option_tot_len, option_cls - ) - current = current.payload - # Dissect flowsets - if NetflowDataflowsetV9 in pkt: - datafl = pkt[NetflowDataflowsetV9] - tid = datafl.templateID - if tid not in definitions and tid not in definitions_opts: - ignored.add(tid) - continue - # All data is stored in one record, awaiting to be split - # If fieldValue is available, the record has not been - # defragmented: pop it - try: - data = datafl.records[0].fieldValue - datafl.records.pop(0) - except (IndexError, AttributeError): - continue - res = [] - # Flowset record - # Now, according to the flow/option data, - # let's re-dissect NetflowDataflowsetV9 - if tid in definitions: - tot_len, cls = definitions[tid] - while len(data) >= tot_len: - res.append(cls(data[:tot_len])) - data = data[tot_len:] - # Inject dissected data - datafl.records = res - datafl.do_dissect_payload(data) - # Options - elif tid in definitions_opts: - (scope_len, scope_cls, - option_len, option_cls) = definitions_opts[tid] - # Dissect scopes - if scope_len: - res.append(scope_cls(data[:scope_len])) - if option_len: - res.append( - option_cls(data[scope_len:scope_len + option_len]) - ) - if len(data) > scope_len + option_len: - res.append( - conf.padding_layer(data[scope_len + option_len:]) - ) - # Inject dissected data - datafl.records = res - datafl.name = "Netflow DataFlowSet V9 - OPTIONS" + for pkt in plist: + _netflowv9_defragment_packet(pkt, + definitions, + definitions_opts, + ignored) if conf.verb >= 1 and ignored: warning("Ignored templateIDs (missing): %s" % list(ignored)) return plist +class NetflowSession(IPSession): + def __init__(self, *args): + IPSession.__init__(self, *args) + self.definitions = {} + self.definitions_opts = {} + self.ignored = set() + + def _process_packet(self, pkt): + _netflowv9_defragment_packet(pkt, + self.definitions, + self.definitions_opts, + self.ignored) + return pkt + + def on_packet_received(self, pkt): + # First, defragment IP if necessary + pkt = self._ip_process_packet(pkt) + # Now handle NetflowV9 defragmentation + pkt = self._process_packet(pkt) + DefaultSession.on_packet_received(self, pkt) + + class NetflowOptionsRecordScopeV9(NetflowRecordV9): name = "Netflow Options Template Record V9 - Scope" diff --git a/scapy/sendrecv.py b/scapy/sendrecv.py index 0976d7f1dbb..f204371d9c1 100644 --- a/scapy/sendrecv.py +++ b/scapy/sendrecv.py @@ -24,11 +24,12 @@ from scapy.packet import Packet, Gen from scapy.utils import get_temp_file, tcpdump, wrpcap, \ ContextManagerSubprocess, PcapReader -from scapy import plist +from scapy.plist import PacketList, SndRcvList from scapy.error import log_runtime, log_interactive from scapy.base_classes import SetGen from scapy.modules import six from scapy.modules.six.moves import map +from scapy.sessions import DefaultSession from scapy.supersocket import SuperSocket if conf.route is None: # unused import, only to initialize conf.route @@ -176,9 +177,9 @@ def sndrcv(pks, pkt, timeout=None, inter=0, verbose=None, chainCC=False, if process is not None: use_prn_mode = True _storage_policy = lambda x, y: process(x, y) - debug.recv = plist.PacketList([], "Unanswered") - debug.sent = plist.PacketList([], "Sent") - debug.match = plist.SndRcvList([]) + debug.recv = PacketList([], "Unanswered") + debug.sent = PacketList([], "Sent") + debug.match = SndRcvList([]) nbrecv = 0 ans = [] listable = (isinstance(pkt, Packet) and pkt.__iterlen__() == 1) or isinstance(pkt, list) # noqa: E501 @@ -241,8 +242,8 @@ def sndrcv(pks, pkt, timeout=None, inter=0, verbose=None, chainCC=False, retry -= 1 if conf.debug_match: - debug.sent = plist.PacketList(remain[:], "Sent") - debug.match = plist.SndRcvList(ans[:]) + debug.sent = PacketList(remain[:], "Sent") + debug.match = SndRcvList(ans[:]) # Clean the ans list to delete the field _answered if multi: @@ -256,8 +257,8 @@ def sndrcv(pks, pkt, timeout=None, inter=0, verbose=None, chainCC=False, if store_unanswered and use_prn_mode: remain = [process(x, None) for x in remain] - ans_result = ans if use_prn_mode else plist.SndRcvList(ans) - unans_result = remain if use_prn_mode else (None if not store_unanswered else plist.PacketList(remain, "Unanswered")) # noqa: E501 + ans_result = ans if use_prn_mode else SndRcvList(ans) + unans_result = remain if use_prn_mode else (None if not store_unanswered else PacketList(remain, "Unanswered")) # noqa: E501 return ans_result, unans_result @@ -274,7 +275,7 @@ def __gen_send(s, x, inter=0, loop=0, count=None, verbose=None, realtime=None, r elif not loop: loop = -1 if return_packets: - sent_packets = plist.PacketList() + sent_packets = PacketList() try: while loop: dt0 = None @@ -612,7 +613,7 @@ def __sr_loop(srfunc, pkts, prn=lambda x: x[1].summary(), prnfail=lambda x: x.su if verbose and n > 0: print(ct.normal("\nSent %i packets, received %i packets. %3.1f%% hits." % (n, r, 100.0 * r / n))) # noqa: E501 - return plist.SndRcvList(ans), plist.PacketList(unans) + return SndRcvList(ans), PacketList(unans) @conf.commands.register @@ -705,8 +706,8 @@ def _timeout(timeout): count_packets.empty() del count_packets - ans_result = ans if use_prn_mode else plist.SndRcvList(ans) - unans_result = remain if use_prn_mode else (None if not store_unanswered else plist.PacketList(remain, "Unanswered")) # noqa: E501 + ans_result = ans if use_prn_mode else SndRcvList(ans) + unans_result = remain if use_prn_mode else (None if not store_unanswered else PacketList(remain, "Unanswered")) # noqa: E501 return ans_result, unans_result @@ -779,7 +780,8 @@ def srp1flood(x, promisc=None, filter=None, iface=None, nofilter=0, *args, **kar @conf.commands.register def sniff(count=0, store=True, offline=None, prn=None, lfilter=None, L2socket=None, timeout=None, opened_socket=None, - stop_filter=None, iface=None, started_callback=None, *arg, **karg): + stop_filter=None, iface=None, started_callback=None, + session=None, *arg, **karg): """Sniff packets and return a list of packets. Args: @@ -788,6 +790,8 @@ def sniff(count=0, store=True, offline=None, prn=None, lfilter=None, prn: function to apply to each packet. If something is returned, it is displayed. --Ex: prn = lambda x: x.summary() + session: a session = a flow decoder used to handle stream of packets. + e.g: IPSession (to defragment on-the-flow) or NetflowSession filter: BPF filter to apply. lfilter: Python function applied to each packet to determine if further action may be done. @@ -813,6 +817,9 @@ def sniff(count=0, store=True, offline=None, prn=None, lfilter=None, Examples: >>> sniff(filter="arp") + >>> sniff(filter="tcp", + ... session=IPSession, # defragment on-the-flow + ... prn=lambda x: x.summary()) >>> sniff(lfilter=lambda pkt: ARP in pkt) >>> sniff(iface="eth0", prn=Packet.summary) >>> sniff(iface=["eth0", "mon0"], @@ -823,6 +830,8 @@ def sniff(count=0, store=True, offline=None, prn=None, lfilter=None, ... pkt.summary())) """ c = 0 + session = session or DefaultSession + session = session(prn, store) # instantiate session sniff_sockets = {} # socket: label dict if opened_socket is not None: if isinstance(opened_socket, list): @@ -866,7 +875,6 @@ def sniff(count=0, store=True, offline=None, prn=None, lfilter=None, else: sniff_sockets[L2socket(type=ETH_P_ALL, iface=iface, *arg, **karg)] = iface - lst = [] if timeout is not None: stoptime = time.time() + timeout remain = None @@ -911,13 +919,9 @@ def sniff(count=0, store=True, offline=None, prn=None, lfilter=None, if lfilter and not lfilter(p): continue p.sniffed_on = sniff_sockets[s] - if store: - lst.append(p) c += 1 - if prn: - r = prn(p) - if r is not None: - print(r) + # on_packet_received handles the prn/storage + session.on_packet_received(p) if stop_filter and stop_filter(p): sniff_sockets = [] break @@ -929,7 +933,7 @@ def sniff(count=0, store=True, offline=None, prn=None, lfilter=None, if opened_socket is None: for s in sniff_sockets: s.close() - return plist.PacketList(lst, "Sniffed") + return session.toPacketList() @conf.commands.register diff --git a/scapy/sessions.py b/scapy/sessions.py new file mode 100644 index 00000000000..2a7339c67e7 --- /dev/null +++ b/scapy/sessions.py @@ -0,0 +1,77 @@ +# This file is part of Scapy +# See http://www.secdev.org/projects/scapy for more information +# Copyright (C) Philippe Biondi +# This program is published under a GPLv2 license + +""" +Sessions: decode flow of packets when sniffing +""" + +from collections import defaultdict +from scapy.plist import PacketList +from scapy.compat import raw + + +class DefaultSession(object): + """Default session: no stream decoding""" + + def __init__(self, prn, store): + self.prn = prn + self.store = store + self.lst = [] + + def toPacketList(self): + return PacketList(self.lst, "Sniffed") + + def on_packet_received(self, pkt): + """DEV: entry point. Will be called by sniff() for each + received packet (that passes the filters). + """ + if not pkt: + return + if self.store: + self.lst.append(pkt) + if self.prn: + result = self.prn(pkt) + if result is not None: + print(result) + + +class IPSession(DefaultSession): + """Defragment IP packets 'on-the-flow'. + Usage: + >>> sniff(session=IPSession) + """ + + def __init__(self, *args): + DefaultSession.__init__(self, *args) + self.fragments = defaultdict(lambda: []) + + def _ip_process_packet(self, packet): + from scapy.layers.inet import _defrag_list, IP + if IP not in packet: + return packet + ip = packet[IP] + packet._defrag_pos = 0 + if ip.frag != 0 or ip.flags.MF: + uniq = (ip.id, ip.src, ip.dst, ip.proto) + self.fragments[uniq].append(packet) + if ip.flags.MF == 0: # end of frag + try: + if self.fragments[uniq][0].frag == 0: + # Has first fragment (otherwise ignore) + defrag, missfrag = [], [] + _defrag_list(self.fragments[uniq], defrag, missfrag) + defragmented_packet = defrag[0] + defragmented_packet = defragmented_packet.__class__( + raw(defragmented_packet) + ) + return defragmented_packet + finally: + del self.fragments[uniq] + else: + return packet + + def on_packet_received(self, pkt): + pkt = self._ip_process_packet(pkt) + DefaultSession.on_packet_received(self, pkt)