Skip to content

Commit

Permalink
Add IP/Netflow session to defragment packets on-the-flow
Browse files Browse the repository at this point in the history
  • Loading branch information
gpotter2 committed Feb 24, 2019
1 parent b1b1fc8 commit b124e22
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 158 deletions.
1 change: 1 addition & 0 deletions scapy/all.py
Expand Up @@ -26,6 +26,7 @@
from scapy.utils import *
from scapy.route import *
from scapy.sendrecv import *
from scapy.sessions import *
from scapy.supersocket import *
from scapy.volatile import *
from scapy.as_resolvers import *
Expand Down
79 changes: 42 additions & 37 deletions scapy/layers/inet.py
Expand Up @@ -1003,6 +1003,46 @@ def overlap_frag(p, overlap, fragsize=8, overlap_fragsize=None):
return qfrag + fragment(p, fragsize)


def _defrag_list(lst, defrag, missfrag):
"""Internal usage only. Part of the _defrag_logic"""
p = lst[0]
lastp = lst[-1]
if p.frag > 0 or lastp.flags & 1 != 0: # first or last fragment missing # noqa: E501
missfrag.append(lst)
return
p = p.copy()
if conf.padding_layer in p:
del(p[conf.padding_layer].underlayer.payload)
ip = p[IP]
if ip.len is None or ip.ihl is None:
clen = len(ip.payload)
else:
clen = ip.len - (ip.ihl << 2)
txt = conf.raw_layer()
for q in lst[1:]:
if clen != q.frag << 3: # Wrong fragmentation offset
if clen > q.frag << 3:
warning("Fragment overlap (%i > %i) %r || %r || %r" % (clen, q.frag << 3, p, txt, q)) # noqa: E501
missfrag.append(lst)
break
if q[IP].len is None or q[IP].ihl is None:
clen += len(q[IP].payload)
else:
clen += q[IP].len - (q[IP].ihl << 2)
if conf.padding_layer in q:
del(q[conf.padding_layer].underlayer.payload)
txt.add_payload(q[IP].payload.copy())
if q.time > p.time:
p.time = q.time
else:
ip.flags &= ~1 # !MF
del(ip.chksum)
del(ip.len)
p = p / txt
p._defrag_pos = max(x._defrag_pos for x in lst)
defrag.append(p)


def _defrag_logic(plist, complete=False):
"""Internal function used to defragment a list of packets.
It contains the logic behind the defrag() and defragment() functions
Expand All @@ -1015,7 +1055,7 @@ def _defrag_logic(plist, complete=False):
pos += 1
if IP in p:
ip = p[IP]
if ip.frag != 0 or ip.flags & 1:
if ip.frag != 0 or ip.flags.MF:
uniq = (ip.id, ip.src, ip.dst, ip.proto)
frags[uniq].append(p)
continue
Expand All @@ -1025,42 +1065,7 @@ def _defrag_logic(plist, complete=False):
missfrag = []
for lst in six.itervalues(frags):
lst.sort(key=lambda x: x.frag)
p = lst[0]
lastp = lst[-1]
if p.frag > 0 or lastp.flags & 1 != 0: # first or last fragment missing # noqa: E501
missfrag.append(lst)
continue
p = p.copy()
if conf.padding_layer in p:
del(p[conf.padding_layer].underlayer.payload)
ip = p[IP]
if ip.len is None or ip.ihl is None:
clen = len(ip.payload)
else:
clen = ip.len - (ip.ihl << 2)
txt = conf.raw_layer()
for q in lst[1:]:
if clen != q.frag << 3: # Wrong fragmentation offset
if clen > q.frag << 3:
warning("Fragment overlap (%i > %i) %r || %r || %r" % (clen, q.frag << 3, p, txt, q)) # noqa: E501
missfrag.append(lst)
break
if q[IP].len is None or q[IP].ihl is None:
clen += len(q[IP].payload)
else:
clen += q[IP].len - (q[IP].ihl << 2)
if conf.padding_layer in q:
del(q[conf.padding_layer].underlayer.payload)
txt.add_payload(q[IP].payload.copy())
if q.time > p.time:
p.time = q.time
else:
ip.flags &= ~1 # !MF
del(ip.chksum)
del(ip.len)
p = p / txt
p._defrag_pos = max(x._defrag_pos for x in lst)
defrag.append(p)
_defrag_list(lst, defrag, missfrag)
defrag2 = []
for p in defrag:
q = p.__class__(raw(p))
Expand Down
237 changes: 137 additions & 100 deletions scapy/layers/netflow.py
Expand Up @@ -8,10 +8,14 @@
"""
Cisco NetFlow protocol v1, v5 and v9
HowTo debug NetflowV9 packets:
HowTo dissect NetflowV9 packets:
# From a pcap
- get a list of packets containing NetflowV9 packets
- call netflowv9_defragment(plist) to defragment the list
Caution: this API might be updated
- call `netflowv9_defragment(plist)` to defragment the list
# Live / on-the-flow:
>>> sniff(session=NetflowSession, prn=[...])
"""

import struct
Expand All @@ -25,6 +29,7 @@
StrField, StrFixedLenField, ThreeBytesField, UTCTimeField, XByteField, \
XShortField
from scapy.packet import Packet, bind_layers, bind_bottom_up
from scapy.sessions import IPSession, DefaultSession

from scapy.layers.inet import UDP
from scapy.layers.inet6 import IP6Field
Expand All @@ -35,8 +40,10 @@ class NetflowHeader(Packet):
fields_desc = [ShortField("version", 1)]


bind_bottom_up(UDP, NetflowHeader, dport=2055)
bind_bottom_up(UDP, NetflowHeader, sport=2055)
for port in [2055, 2056, 9995, 9996, 6343]: # Classic NetFlow ports
bind_bottom_up(UDP, NetflowHeader, dport=port)
bind_bottom_up(UDP, NetflowHeader, sport=port)
# However, we'll default to 2055, classic among classics :)
bind_layers(UDP, NetflowHeader, dport=2055, sport=2055)

###########################################
Expand Down Expand Up @@ -431,6 +438,104 @@ def dispatch_hook(cls, _pkt=None, *args, **kargs):
return cls


def _netflowv9_defragment_packet(pkt, definitions, definitions_opts, ignored):
"""Used internally to process a single packet during defragmenting"""
# Dataflowset definitions
if NetflowFlowsetV9 in pkt:
current = pkt
while NetflowFlowsetV9 in current:
current = current[NetflowFlowsetV9]
for ntv9 in current.templates:
llist = []
for tmpl in ntv9.template_fields:
llist.append((tmpl.fieldLength, tmpl.fieldType))
if llist:
tot_len = sum(x[0] for x in llist)
cls = _GenNetflowRecordV9(NetflowRecordV9, llist)
definitions[ntv9.templateID] = (tot_len, cls)
current = current.payload
# Options definitions
if NetflowOptionsFlowsetV9 in pkt:
current = pkt
while NetflowOptionsFlowsetV9 in current:
current = current[NetflowOptionsFlowsetV9]
# Load scopes
llist = []
for scope in current.scopes:
llist.append((
scope.scopeFieldlength,
scope.scopeFieldType
))
scope_tot_len = sum(x[0] for x in llist)
scope_cls = _GenNetflowRecordV9(
NetflowOptionsRecordScopeV9,
llist
)
# Load options
llist = []
for opt in current.options:
llist.append((
opt.optionFieldlength,
opt.optionFieldType
))
option_tot_len = sum(x[0] for x in llist)
option_cls = _GenNetflowRecordV9(
NetflowOptionsRecordOptionV9,
llist
)
# Storage
definitions_opts[current.templateID] = (
scope_tot_len, scope_cls,
option_tot_len, option_cls
)
current = current.payload
# Dissect flowsets
if NetflowDataflowsetV9 in pkt:
datafl = pkt[NetflowDataflowsetV9]
tid = datafl.templateID
if tid not in definitions and tid not in definitions_opts:
ignored.add(tid)
return
# All data is stored in one record, awaiting to be split
# If fieldValue is available, the record has not been
# defragmented: pop it
try:
data = datafl.records[0].fieldValue
datafl.records.pop(0)
except (IndexError, AttributeError):
return
res = []
# Flowset record
# Now, according to the flow/option data,
# let's re-dissect NetflowDataflowsetV9
if tid in definitions:
tot_len, cls = definitions[tid]
while len(data) >= tot_len:
res.append(cls(data[:tot_len]))
data = data[tot_len:]
# Inject dissected data
datafl.records = res
datafl.do_dissect_payload(data)
# Options
elif tid in definitions_opts:
(scope_len, scope_cls,
option_len, option_cls) = definitions_opts[tid]
# Dissect scopes
if scope_len:
res.append(scope_cls(data[:scope_len]))
if option_len:
res.append(
option_cls(data[scope_len:scope_len + option_len])
)
if len(data) > scope_len + option_len:
res.append(
conf.padding_layer(data[scope_len + option_len:])
)
# Inject dissected data
datafl.records = res
datafl.name = "Netflow DataFlowSet V9 - OPTIONS"


def netflowv9_defragment(plist, verb=1):
"""Process all NetflowV9 Packets to match IDs of the DataFlowsets
with the Headers
Expand All @@ -444,106 +549,38 @@ def netflowv9_defragment(plist, verb=1):
definitions_opts = {}
ignored = set()
# Iterate through initial list
for pkt in plist: # NetflowDataflowsetV9:
# Dataflowset definitions
if NetflowFlowsetV9 in pkt:
current = pkt
while NetflowFlowsetV9 in current:
current = current[NetflowFlowsetV9]
for ntv9 in current.templates:
llist = []
for tmpl in ntv9.template_fields:
llist.append((tmpl.fieldLength, tmpl.fieldType))
if llist:
tot_len = sum(x[0] for x in llist)
cls = _GenNetflowRecordV9(NetflowRecordV9, llist)
definitions[ntv9.templateID] = (tot_len, cls)
current = current.payload
# Options definitions
if NetflowOptionsFlowsetV9 in pkt:
current = pkt
while NetflowOptionsFlowsetV9 in current:
current = current[NetflowOptionsFlowsetV9]
# Load scopes
llist = []
for scope in current.scopes:
llist.append((
scope.scopeFieldlength,
scope.scopeFieldType
))
scope_tot_len = sum(x[0] for x in llist)
scope_cls = _GenNetflowRecordV9(
NetflowOptionsRecordScopeV9,
llist
)
# Load options
llist = []
for opt in current.options:
llist.append((
opt.optionFieldlength,
opt.optionFieldType
))
option_tot_len = sum(x[0] for x in llist)
option_cls = _GenNetflowRecordV9(
NetflowOptionsRecordOptionV9,
llist
)
# Storage
definitions_opts[current.templateID] = (
scope_tot_len, scope_cls,
option_tot_len, option_cls
)
current = current.payload
# Dissect flowsets
if NetflowDataflowsetV9 in pkt:
datafl = pkt[NetflowDataflowsetV9]
tid = datafl.templateID
if tid not in definitions and tid not in definitions_opts:
ignored.add(tid)
continue
# All data is stored in one record, awaiting to be split
# If fieldValue is available, the record has not been
# defragmented: pop it
try:
data = datafl.records[0].fieldValue
datafl.records.pop(0)
except (IndexError, AttributeError):
continue
res = []
# Flowset record
# Now, according to the flow/option data,
# let's re-dissect NetflowDataflowsetV9
if tid in definitions:
tot_len, cls = definitions[tid]
while len(data) >= tot_len:
res.append(cls(data[:tot_len]))
data = data[tot_len:]
# Inject dissected data
datafl.records = res
datafl.do_dissect_payload(data)
# Options
elif tid in definitions_opts:
(scope_len, scope_cls,
option_len, option_cls) = definitions_opts[tid]
# Dissect scopes
if scope_len:
res.append(scope_cls(data[:scope_len]))
if option_len:
res.append(
option_cls(data[scope_len:scope_len + option_len])
)
if len(data) > scope_len + option_len:
res.append(
conf.padding_layer(data[scope_len + option_len:])
)
# Inject dissected data
datafl.records = res
datafl.name = "Netflow DataFlowSet V9 - OPTIONS"
for pkt in plist:
_netflowv9_defragment_packet(pkt,
definitions,
definitions_opts,
ignored)
if conf.verb >= 1 and ignored:
warning("Ignored templateIDs (missing): %s" % list(ignored))
return plist


class NetflowSession(IPSession):
def __init__(self, *args):
IPSession.__init__(self, *args)
self.definitions = {}
self.definitions_opts = {}
self.ignored = set()

def _process_packet(self, pkt):
_netflowv9_defragment_packet(pkt,
self.definitions,
self.definitions_opts,
self.ignored)
return pkt

def on_packet_received(self, pkt):
# First, defragment IP if necessary
pkt = self._ip_process_packet(pkt)
# Now handle NetflowV9 defragmentation
pkt = self._process_packet(pkt)
DefaultSession.on_packet_received(self, pkt)


class NetflowOptionsRecordScopeV9(NetflowRecordV9):
name = "Netflow Options Template Record V9 - Scope"

Expand Down

0 comments on commit b124e22

Please sign in to comment.