From 5a48d250bb360918217c0b21e2373f24b4e32b1e Mon Sep 17 00:00:00 2001 From: gpotter2 Date: Tue, 2 Apr 2019 17:57:04 +0200 Subject: [PATCH 01/10] Fix DNS already-encoded detection & support unicode --- scapy/compat.py | 10 ++++++++- scapy/layers/dns.py | 50 ++++++++++++++++++++++++++++++--------------- test/regression.uts | 7 +++++++ 3 files changed, 50 insertions(+), 17 deletions(-) diff --git a/scapy/compat.py b/scapy/compat.py index 13ffdcef745..4726550fc49 100644 --- a/scapy/compat.py +++ b/scapy/compat.py @@ -34,10 +34,18 @@ def lambda_tuple_converter(func): if six.PY2: - bytes_encode = plain_str = str + plain_str = str chb = lambda x: x if isinstance(x, str) else chr(x) orb = ord + def bytes_encode(x): + """Ensure that the given object is bytes. + If the parameter is a packet, raw() should be preferred. + """ + if isinstance(x, six.text_type): + return x.encode('utf8') + return str(x) + def raw(x): """Builds a packet and returns its bytes representation. This function is and always be cross-version compatible""" diff --git a/scapy/layers/dns.py b/scapy/layers/dns.py index a06786511c8..22f98443395 100755 --- a/scapy/layers/dns.py +++ b/scapy/layers/dns.py @@ -104,6 +104,30 @@ def dns_get_str(s, pointer=0, pkt=None, _fullpacket=False): return name, pointer, bytes_left +def dns_encode(x, check_built=False): + """Encodes a bytes string into the DNS format + + :param x: the string + :param check_built: detect already-built strings and ignore them + :returns: the encoded bytes string + """ + if not x or x == b".": + return b"\x00" + + built = check_built + built &= b"." not in x + built &= ((x[-1] == b"\x00") or ((orb(x[-2]) & 0xc0)) == 0xc0) + if built: + # The value has already been processed. Do not process it again + return x + + # Truncate chunks that cannot be encoded (more than 63 bytes..) + x = b"".join(chb(len(y)) + y for y in (k[:63] for k in x.split(b"."))) + if x[-1:] != b"\x00": + x += b"\x00" + return x + + def DNSgetstr(*args, **kwargs): """Legacy function. Deprecated""" raise DeprecationWarning("DNSgetstr deprecated. Use dns_get_str instead") @@ -143,11 +167,10 @@ def possible_shortens(dat): yield dat.split(b".", x)[x] data = {} burned_data = 0 - dummy_dns = DNSStrField("", "") # Used for its i2m method for current, name, dat in field_gen(dns_pkt): for part in possible_shortens(dat): # Encode the data - encoded = dummy_dns.i2m(None, part) + encoded = dns_encode(part, check_built=True) if part not in data: # We have no occurrence of such data, let's store it as a # possible pointer for future strings. @@ -179,7 +202,7 @@ def possible_shortens(dat): # setfieldval edits the value of the field in the layer val = rep[0].getfieldval(rep[1]) assert val.endswith(ck) - kept_string = dummy_dns.i2m(None, val[:-len(ck)])[:-1] + kept_string = dns_encode(val[:-len(ck)], check_built=True)[:-1] new_val = kept_string + replace_pointer rep[0].setfieldval(rep[1], new_val) try: @@ -209,19 +232,14 @@ def h2i(self, pkt, x): return b"." return x - def i2m(self, pkt, x): - if any((orb(y) >= 0xc0) for y in x): - # The value has already been processed. Do not process it again - return x - - if not x or x == b".": - return b"\x00" + def any2i(self, pkt, x): + if isinstance(x, six.text_type): + if x and x[-1] != u".": + x += u"." + return StrField.any2i(self, pkt, x) - # Truncate chunks that cannot be encoded (more than 63 bytes..) - x = b"".join(chb(len(y)) + y for y in (k[:63] for k in x.split(b"."))) - if orb(x[-1]) != 0 and (orb(x[-2]) < 0xc0): - x += b"\x00" - return x + def i2m(self, pkt, x): + return dns_encode(x, check_built=True) def getfield(self, pkt, s): # Decode the compressed DNS message @@ -351,7 +369,7 @@ def i2m(self, pkt, s): if s: s = inet_pton(socket.AF_INET, s) elif pkt.type in [2, 3, 4, 5, 12]: # NS, MD, MF, CNAME, PTR - s = DNSStrField("", "").i2m(None, s) + s = dns_encode(x, check_built=True) elif pkt.type == 16: # TXT ret_s = b"" for text in s: diff --git a/test/regression.uts b/test/regression.uts index affde2a4cca..1f3e392a1b1 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -7260,6 +7260,13 @@ assert dns_get_str(b"\x04data\xc0\x01", 0, _fullpacket=True)[0] == b"data." s = b'\x00\x00\x84\x00\x00\x00\x00\x02\x00\x00\x00\x06\x0bGourmandise\x04_smb\x04_tcp\x05local\x00\x00!\x80\x01\x00\x00\x00x\x00\x14\x00\x00\x00\x00\x01\xbd\x0bGourmandise\xc0"\x0bGourmandise\x0b_afpovertcp\xc0\x1d\x00!\x80\x01\x00\x00\x00x\x00\x08\x00\x00\x00\x00\x02$\xc09\xc09\x00\x1c\x80\x01\x00\x00\x00x\x00\x10\xfe\x80\x00\x00\x00\x00\x00\x00\x00s#\x99\xca\xf7\xea\xdc\xc09\x00\x01\x80\x01\x00\x00\x00x\x00\x04\xc0\xa8\x01x\xc09\x00\x1c\x80\x01\x00\x00\x00x\x00\x10*\x01\xcb\x00\x0bD\x1f\x00\x18k\xb1\x99\x90\xdf\x84.\xc0\x0c\x00/\x80\x01\x00\x00\x00x\x00\t\xc0\x0c\x00\x05\x00\x00\x80\x00@\xc0G\x00/\x80\x01\x00\x00\x00x\x00\t\xc0G\x00\x05\x00\x00\x80\x00@\xc09\x00/\x80\x01\x00\x00\x00x\x00\x08\xc09\x00\x04@\x00\x00\x08' DNS(s) += Check non-unicode hostnames for DNS encoding +~ dns +s = bytes(DNSRR(rrname='a\u0080b.')) +assert s == b'\x04a\xc2\x80b\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00\x00' +s = raw(DNSRR(rrname=u'a\u0080')) +assert s == b'\x03a\xc2\x80\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00\x00' + = Layer binding * Test DestMACField & DestIPField From cb6e93b442506b45f884ac3af86e480b05537643 Mon Sep 17 00:00:00 2001 From: gpotter2 Date: Tue, 2 Apr 2019 23:43:57 +0200 Subject: [PATCH 02/10] Performance documentation --- scapy/compat.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scapy/compat.py b/scapy/compat.py index 4726550fc49..94f09e3ca4e 100644 --- a/scapy/compat.py +++ b/scapy/compat.py @@ -62,6 +62,8 @@ def bytes_encode(x): """Ensure that the given object is bytes. If the parameter is a packet, raw() should be preferred. """ + # Why not merge with the Python 2 one you say ? + # because x.encode('utf8') is 30% slower if isinstance(x, str): return x.encode() return bytes(x) From 66fe84c047307e074df1826a36ebe1ccd497878d Mon Sep 17 00:00:00 2001 From: gpotter2 Date: Thu, 4 Apr 2019 17:46:09 +0200 Subject: [PATCH 03/10] Ignore keywords case in UTscapy --- scapy/tools/UTscapy.py | 41 ++++++++++++++++++++--------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/scapy/tools/UTscapy.py b/scapy/tools/UTscapy.py index 5cd3b13d15d..90de6b6d8da 100644 --- a/scapy/tools/UTscapy.py +++ b/scapy/tools/UTscapy.py @@ -162,8 +162,9 @@ def __getitem__(self, item): def add_keywords(self, kws): if isinstance(kws, six.string_types): - kws = [kws] + kws = [kws.lower()] for kwd in kws: + kwd = kwd.lower() if kwd.startswith('-'): try: self.keywords.remove(kwd[1:]) @@ -408,28 +409,26 @@ def filter_tests_on_numbers(test_campaign, num): if ts.tests] -def filter_tests_keep_on_keywords(test_campaign, kw): +def _filter_tests_kw(test_campaign, kw, keep): def kw_match(lst, kw): - for k in lst: - if k in kw: - return True - return False + return any(k for k in lst if kw in k) if kw: + kw = kw.lower() + if keep: + cond = lambda x: x + else: + cond = lambda x: not x for ts in test_campaign: - ts.tests = [t for t in ts.tests if kw_match(t.keywords, kw)] + ts.tests = [t for t in ts.tests if cond(kw_match(t.keywords, kw))] -def filter_tests_remove_on_keywords(test_campaign, kw): - def kw_match(lst, kw): - for k in kw: - if k in lst: - return True - return False +def filter_tests_keep_on_keywords(test_campaign, kw): + return _filter_tests_kw(test_campaign, kw, True) - if kw: - for ts in test_campaign: - ts.tests = [t for t in ts.tests if not kw_match(t.keywords, kw)] + +def filter_tests_remove_on_keywords(test_campaign, kw): + return _filter_tests_kw(test_campaign, kw, False) def remove_empty_testsets(test_campaign): @@ -850,8 +849,8 @@ def main(): LOCAL = 1 if data.local else 0 NUM = data.num MODULES = data.modules - KW_OK = [data.kw_ok] - KW_KO = [data.kw_ko] + KW_OK.extend(data.kw_ok) + KW_KO.extend(data.kw_ko) try: FORMAT = Format.from_string(data.format) except KeyError as msg: @@ -876,13 +875,13 @@ def main(): elif opt == "-m": MODULES.append(optarg) elif opt == "-k": - KW_OK.append(optarg.split(",")) + KW_OK.extend(optarg.split(",")) elif opt == "-K": - KW_KO.append(optarg.split(",")) + KW_KO.extend(optarg.split(",")) # Discard Python3 tests when using Python2 if six.PY2: - KW_KO.append(["python3_only"]) + KW_KO.append("python3_only") if VERB > 2: print("### Booting scapy...", file=sys.stderr) From 270424fe781775a36d2c28776899ad8ed0bafe7e Mon Sep 17 00:00:00 2001 From: gpotter2 Date: Thu, 4 Apr 2019 18:29:03 +0200 Subject: [PATCH 04/10] Remove RDataField --- scapy/layers/dns.py | 117 ++++++++++++++++++------------------ test/answering_machines.uts | 2 +- test/regression.uts | 9 +-- 3 files changed, 64 insertions(+), 64 deletions(-) diff --git a/scapy/layers/dns.py b/scapy/layers/dns.py index 22f98443395..03f5bb290c9 100755 --- a/scapy/layers/dns.py +++ b/scapy/layers/dns.py @@ -8,7 +8,6 @@ """ from __future__ import absolute_import -import socket import struct import time @@ -17,16 +16,15 @@ from scapy.fields import BitEnumField, BitField, ByteEnumField, ByteField, \ ConditionalField, Field, FieldLenField, FlagsField, IntField, \ PacketListField, ShortEnumField, ShortField, StrField, StrFixedLenField, \ - StrLenField + StrLenField, MultipleTypeField from scapy.compat import orb, raw, chb, bytes_encode from scapy.ansmachine import AnsweringMachine from scapy.sendrecv import sr1 -from scapy.layers.inet import IP, DestIPField, UDP, TCP -from scapy.layers.inet6 import DestIP6Field +from scapy.layers.inet import IP, DestIPField, IPField, UDP, TCP +from scapy.layers.inet6 import DestIP6Field, IP6Field from scapy.error import warning, Scapy_Exception import scapy.modules.six as six from scapy.modules.six.moves import range -from scapy.pton_ntop import inet_ntop, inet_pton def dns_get_str(s, pointer=0, pkt=None, _fullpacket=False): @@ -114,10 +112,9 @@ def dns_encode(x, check_built=False): if not x or x == b".": return b"\x00" - built = check_built - built &= b"." not in x - built &= ((x[-1] == b"\x00") or ((orb(x[-2]) & 0xc0)) == 0xc0) - if built: + if check_built and b"." not in x and ( + orb(x[-1]) == 0 or (orb(x[-2]) & 0xc0) == 0xc0 + ): # The value has already been processed. Do not process it again return x @@ -153,7 +150,7 @@ def field_gen(dns_pkt): if isinstance(current, InheritOriginDNSStrPacket): for field in current.fields_desc: if isinstance(field, DNSStrField) or \ - (isinstance(field, RDataField) and + (isinstance(field, MultipleTypeField) and current.type in [2, 5, 12]): # Get the associated data and store it accordingly # noqa: E501 dat = current.getfieldval(field.name) @@ -226,7 +223,7 @@ def __init__(self, _pkt=None, _orig_s=None, _orig_p=None, *args, **kwargs): Packet.__init__(self, _pkt=_pkt, *args, **kwargs) -class DNSStrField(StrField): +class DNSStrField(StrLenField): def h2i(self, pkt, x): if not x: return b"." @@ -242,9 +239,13 @@ def i2m(self, pkt, x): return dns_encode(x, check_built=True) def getfield(self, pkt, s): + remain = b"" + if self.length_from: + remain, s = StrLenField.getfield(self, pkt, s) # Decode the compressed DNS message - decoded, index, left = dns_get_str(s, 0, pkt) - return left, decoded + decoded, _, left = dns_get_str(s, 0, pkt) + # returns (remaining, decoded) + return left + remain, decoded class DNSRRCountField(ShortField): @@ -337,56 +338,35 @@ def decodeRR(self, name, s, p): return rr, p -class RDataField(StrLenField): +class DNSTextField(StrLenField): islist = 1 def m2i(self, pkt, s): - family = None - if pkt.type == 1: # A - family = socket.AF_INET - elif pkt.type in [2, 5, 12]: # NS, CNAME, PTR - s = dns_get_str(s, 0, pkt)[0] - elif pkt.type == 16: # TXT - ret_s = list() - tmp_s = s - # RDATA contains a list of strings, each are prepended with - # a byte containing the size of the following string. - while tmp_s: - tmp_len = orb(tmp_s[0]) + 1 - if tmp_len > len(tmp_s): - warning("DNS RR TXT prematured end of character-string (size=%i, remaining bytes=%i)" % (tmp_len, len(tmp_s))) # noqa: E501 - ret_s.append(tmp_s[1:tmp_len]) - tmp_s = tmp_s[tmp_len:] - s = ret_s - elif pkt.type == 28: # AAAA - family = socket.AF_INET6 - if family is not None: - s = inet_ntop(family, s) - return s + ret_s = list() + tmp_s = s + # RDATA contains a list of strings, each are prepended with + # a byte containing the size of the following string. + while tmp_s: + tmp_len = orb(tmp_s[0]) + 1 + if tmp_len > len(tmp_s): + warning("DNS RR TXT prematured end of character-string (size=%i, remaining bytes=%i)" % (tmp_len, len(tmp_s))) # noqa: E501 + ret_s.append(tmp_s[1:tmp_len]) + tmp_s = tmp_s[tmp_len:] + return ret_s def i2m(self, pkt, s): - if pkt.type == 1: # A - if s: - s = inet_pton(socket.AF_INET, s) - elif pkt.type in [2, 3, 4, 5, 12]: # NS, MD, MF, CNAME, PTR - s = dns_encode(x, check_built=True) - elif pkt.type == 16: # TXT - ret_s = b"" - for text in s: - text = bytes_encode(text) - # The initial string must be split into a list of strings - # prepended with theirs sizes. - while len(text) >= 255: - ret_s += b"\xff" + text[:255] - text = text[255:] - # The remaining string is less than 255 bytes long - if len(text): - ret_s += struct.pack("!B", len(text)) + text - s = ret_s - elif pkt.type == 28: # AAAA - if s: - s = inet_pton(socket.AF_INET6, s) - return s + ret_s = b"" + for text in s: + text = bytes_encode(text) + # The initial string must be split into a list of strings + # prepended with theirs sizes. + while len(text) >= 255: + ret_s += b"\xff" + text[:255] + text = text[255:] + # The remaining string is less than 255 bytes long + if len(text): + ret_s += struct.pack("!B", len(text)) + text + return ret_s class RDLenField(Field): @@ -901,7 +881,26 @@ class DNSRR(InheritOriginDNSStrPacket): ShortEnumField("rclass", 1, dnsclasses), IntField("ttl", 0), RDLenField("rdlen"), - RDataField("rdata", "", length_from=lambda pkt:pkt.rdlen)] + MultipleTypeField( + [ + # A + (IPField("rdata", "0.0.0.0"), + lambda pkt: pkt.type == 1), + # AAAA + (IP6Field("rdata", "::"), + lambda pkt: pkt.type == 28), + # NS, MD, MF, CNAME, PTR + (DNSStrField("rdata", "", + length_from=lambda pkt: pkt.rdlen), + lambda pkt: pkt.type in [2, 3, 4, 5, 12]), + # TEXT + (DNSTextField("rdata", [], + length_from=lambda pkt: pkt.rdlen), + lambda pkt: pkt.type == 16), + ], + StrLenField("rdata", "", + length_from=lambda pkt:pkt.rdlen) + )] bind_layers(UDP, DNS, dport=5353) diff --git a/test/answering_machines.uts b/test/answering_machines.uts index 67eaf54dc96..bdc69e47f77 100644 --- a/test/answering_machines.uts +++ b/test/answering_machines.uts @@ -54,7 +54,7 @@ test_am(ARP_am, = DNS_am def check_DNS_am_reply(packet): assert(DNS in packet and packet[DNS].ancount == 1) - assert(packet[DNS].an.rdata == b"192.168.1.1") + assert(packet[DNS].an.rdata == "192.168.1.1") test_am(DNS_am, IP()/UDP()/DNS(qd=DNSQR(qname="www.secdev.org")), diff --git a/test/regression.uts b/test/regression.uts index 1f3e392a1b1..f5c897178e3 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -7262,10 +7262,11 @@ DNS(s) = Check non-unicode hostnames for DNS encoding ~ dns -s = bytes(DNSRR(rrname='a\u0080b.')) -assert s == b'\x04a\xc2\x80b\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00\x00' -s = raw(DNSRR(rrname=u'a\u0080')) -assert s == b'\x03a\xc2\x80\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00\x00' +s = bytes(DNSRR(rrname=u'a\u0080b.')) +assert s == b'\x04a\xc2\x80b\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00' + +s = bytes(DNSRR(rrname=u'a\u0080')) +assert s == b'\x03a\xc2\x80\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00' = Layer binding From 35e9b310c49d48be5c2f043dacc9f56f60d70d7e Mon Sep 17 00:00:00 2001 From: gpotter2 Date: Fri, 5 Apr 2019 01:46:54 +0200 Subject: [PATCH 05/10] Remove outdated fields --- scapy/layers/dns.py | 64 ++++++++++++--------------------------------- 1 file changed, 16 insertions(+), 48 deletions(-) diff --git a/scapy/layers/dns.py b/scapy/layers/dns.py index 03f5bb290c9..a56440805d4 100755 --- a/scapy/layers/dns.py +++ b/scapy/layers/dns.py @@ -14,9 +14,9 @@ from scapy.config import conf from scapy.packet import Packet, bind_layers, NoPayload from scapy.fields import BitEnumField, BitField, ByteEnumField, ByteField, \ - ConditionalField, Field, FieldLenField, FlagsField, IntField, \ + ConditionalField, FieldLenField, FlagsField, IntField, \ PacketListField, ShortEnumField, ShortField, StrField, StrFixedLenField, \ - StrLenField, MultipleTypeField + StrLenField, MultipleTypeField, UTCTimeField from scapy.compat import orb, raw, chb, bytes_encode from scapy.ansmachine import AnsweringMachine from scapy.sendrecv import sr1 @@ -238,6 +238,9 @@ def any2i(self, pkt, x): def i2m(self, pkt, x): return dns_encode(x, check_built=True) + def i2len(self, pkt, x): + return len(self.i2m(pkt, x)) + def getfield(self, pkt, s): remain = b"" if self.length_from: @@ -290,16 +293,12 @@ def i2m(self, pkt, x): def decodeRR(self, name, s, p): ret = s[p:p + 10] - type, cls, ttl, rdlen = struct.unpack("!HHIH", ret) + typ, cls, ttl, rdlen = struct.unpack("!HHIH", ret) p += 10 - rr = DNSRR(b"\x00" + ret + s[p:p + rdlen], _orig_s=s, _orig_p=p) - if type in [2, 3, 4, 5]: - rr.rdata = dns_get_str(s, p, _fullpacket=True)[0] - del(rr.rdlen) - elif type in DNSRR_DISPATCHER: - rr = DNSRR_DISPATCHER[type](b"\x00" + ret + s[p:p + rdlen], _orig_s=s, _orig_p=p) # noqa: E501 - else: - del(rr.rdlen) + cls = DNSRR_DISPATCHER.get(typ, DNSRR) + rr = cls(b"\x00" + ret + s[p:p + rdlen], _orig_s=s, _orig_p=p) + # Will have changed because of decompression + rr.rdlen = None rr.rrname = name p += rdlen @@ -354,6 +353,9 @@ def m2i(self, pkt, s): tmp_s = tmp_s[tmp_len:] return ret_s + def i2len(self, pkt, x): + return len(self.i2m(pkt, x)) + def i2m(self, pkt, s): ret_s = b"" for text in s: @@ -369,23 +371,6 @@ def i2m(self, pkt, s): return ret_s -class RDLenField(Field): - def __init__(self, name): - Field.__init__(self, name, None, "H") - - def i2m(self, pkt, x): - if x is None: - rdataf = pkt.get_field("rdata") - x = len(rdataf.i2m(pkt, pkt.rdata)) - return x - - def i2h(self, pkt, x): - if x is None: - rdataf = pkt.get_field("rdata") - x = len(rdataf.i2m(pkt, pkt.rdata)) - return x - - class DNS(Packet): name = "DNS" fields_desc = [ @@ -517,23 +502,6 @@ class DNSRROPT(InheritOriginDNSStrPacket): dnssecdigesttypes = {0: "Reserved", 1: "SHA-1", 2: "SHA-256", 3: "GOST R 34.11-94", 4: "SHA-384"} # noqa: E501 -class TimeField(IntField): - - def any2i(self, pkt, x): - if isinstance(x, str): - import time - import calendar - t = time.strptime(x, "%Y%m%d%H%M%S") - return int(calendar.timegm(t)) - return x - - def i2repr(self, pkt, x): - import time - x = self.i2h(pkt, x) - t = time.strftime("%Y%m%d%H%M%S", time.gmtime(x)) - return "%s (%d)" % (t, x) - - def bitmap2RRlist(bitmap): """ Decode the 'Type Bit Maps' field of the NSEC Resource Record into an @@ -681,8 +649,8 @@ class DNSRRRSIG(_DNSRRdummy): ByteEnumField("algorithm", 5, dnssecalgotypes), ByteField("labels", 0), IntField("originalttl", 0), - TimeField("expiration", 0), - TimeField("inception", 0), + UTCTimeField("expiration", 0), + UTCTimeField("inception", 0), ShortField("keytag", 0), DNSStrField("signersname", ""), StrField("signature", "") @@ -880,7 +848,7 @@ class DNSRR(InheritOriginDNSStrPacket): ShortEnumField("type", 1, dnstypes), ShortEnumField("rclass", 1, dnsclasses), IntField("ttl", 0), - RDLenField("rdlen"), + FieldLenField("rdlen", None, length_of="rdata", fmt="H"), MultipleTypeField( [ # A From 2c2f6a43d9d44fdc7e636065d5b532c2d92dc860 Mon Sep 17 00:00:00 2001 From: gpotter2 Date: Fri, 5 Apr 2019 02:08:40 +0200 Subject: [PATCH 06/10] Codacy fixes --- scapy/fields.py | 4 ++-- scapy/layers/dns.py | 13 ++++++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/scapy/fields.py b/scapy/fields.py index 78b0ff71660..4ba446805d3 100644 --- a/scapy/fields.py +++ b/scapy/fields.py @@ -919,8 +919,8 @@ def __init__(self, name, default, fmt="H", remain=0): Field.__init__(self, name, default, fmt) self.remain = remain - def i2len(self, pkt, i): - return len(i) + def i2len(self, pkt, x): + return len(x) def any2i(self, pkt, x): if isinstance(x, six.text_type): diff --git a/scapy/layers/dns.py b/scapy/layers/dns.py index a56440805d4..bda5929c97c 100755 --- a/scapy/layers/dns.py +++ b/scapy/layers/dns.py @@ -224,6 +224,12 @@ def __init__(self, _pkt=None, _orig_s=None, _orig_p=None, *args, **kwargs): class DNSStrField(StrLenField): + """ + Special StrField that handles DNS encoding/decoding. + It will also handle DNS decompression. + (may be StrLenField if a length_from is passed), + """ + def h2i(self, pkt, x): if not x: return b"." @@ -293,7 +299,8 @@ def i2m(self, pkt, x): def decodeRR(self, name, s, p): ret = s[p:p + 10] - typ, cls, ttl, rdlen = struct.unpack("!HHIH", ret) + # type, cls, ttl, rdlen + typ, cls, _, rdlen = struct.unpack("!HHIH", ret) p += 10 cls = DNSRR_DISPATCHER.get(typ, DNSRR) rr = cls(b"\x00" + ret + s[p:p + rdlen], _orig_s=s, _orig_p=p) @@ -338,6 +345,10 @@ def decodeRR(self, name, s, p): class DNSTextField(StrLenField): + """ + Special StrLenField that handles DNS TEXT data (16) + """ + islist = 1 def m2i(self, pkt, s): From c9c192a7bde39824687f8aa14b55b16044a0f5fb Mon Sep 17 00:00:00 2001 From: gpotter2 Date: Fri, 5 Apr 2019 13:01:09 +0200 Subject: [PATCH 07/10] Import Bunch and retry_test automatically --- scapy/tools/UTscapy.py | 16 +++++++++++----- test/edns0.uts | 3 +-- test/pipetool.uts | 3 --- test/regression.uts | 5 +---- test/usb.uts | 2 -- test/windows.uts | 2 -- 6 files changed, 13 insertions(+), 18 deletions(-) diff --git a/scapy/tools/UTscapy.py b/scapy/tools/UTscapy.py index 90de6b6d8da..e28d8d36311 100644 --- a/scapy/tools/UTscapy.py +++ b/scapy/tools/UTscapy.py @@ -411,7 +411,7 @@ def filter_tests_on_numbers(test_campaign, num): def _filter_tests_kw(test_campaign, kw, keep): def kw_match(lst, kw): - return any(k for k in lst if kw in k) + return any(k for k in lst if kw == k) if kw: kw = kw.lower() @@ -437,8 +437,8 @@ def remove_empty_testsets(test_campaign): #### RUN TEST ##### -def run_test(test, get_interactive_session, verb=3, ignore_globals=None): - test.output, res = get_interactive_session(test.test.strip(), ignore_globals=ignore_globals, verb=verb) +def run_test(test, get_interactive_session, verb=3, ignore_globals=None, my_globals=None): + test.output, res = get_interactive_session(test.test.strip(), ignore_globals=ignore_globals, verb=verb, my_globals=my_globals) test.result = "failed" try: if res is None or res: @@ -465,14 +465,20 @@ def run_test(test, get_interactive_session, verb=3, ignore_globals=None): #### RUN CAMPAIGN ##### +def import_UTscapy_tools(ses): + ses["retry_test"] = retry_test + ses["Bunch"] = Bunch + def run_campaign(test_campaign, get_interactive_session, verb=3, ignore_globals=None): # noqa: E501 passed = failed = 0 + scapy_ses = importlib.import_module(".all", "scapy").__dict__ + import_UTscapy_tools(scapy_ses) if test_campaign.preexec: - test_campaign.preexec_output = get_interactive_session(test_campaign.preexec.strip(), ignore_globals=ignore_globals)[0] + test_campaign.preexec_output = get_interactive_session(test_campaign.preexec.strip(), ignore_globals=ignore_globals, my_globals=scapy_ses)[0] try: for i, testset in enumerate(test_campaign): for j, t in enumerate(testset): - if run_test(t, get_interactive_session, verb): + if run_test(t, get_interactive_session, verb, my_globals=scapy_ses): passed += 1 else: failed += 1 diff --git a/test/edns0.uts b/test/edns0.uts index 41399694f46..33907f53e8d 100644 --- a/test/edns0.uts +++ b/test/edns0.uts @@ -63,7 +63,6 @@ raw(tlv) == b'\x00\x02\x00\x00' = NSID - Live test ~ netaccess -from scapy.tools.UTscapy import retry_test def _test(): old_debug_dissector = conf.debug_dissector conf.debug_dissector = False @@ -71,4 +70,4 @@ def _test(): conf.debug_dissector = old_debug_dissector len(r.ar) and DNSRROPT in r.ar and len(r.ar[DNSRROPT].rdata) and len([x for x in r.ar[DNSRROPT].rdata if x.optcode == 3]) -retry_test(_test) \ No newline at end of file +retry_test(_test) diff --git a/test/pipetool.uts b/test/pipetool.uts index 2b1f526c5f3..7db32f82f90 100644 --- a/test/pipetool.uts +++ b/test/pipetool.uts @@ -4,9 +4,6 @@ + Basic tests -= Import Bunch -from scapy.tools.UTscapy import Bunch - = Test default test case s = PeriodicSource("hello", 1, name="src") diff --git a/test/regression.uts b/test/regression.uts index f5c897178e3..2b4fb347f10 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -346,7 +346,6 @@ interact_emulator(extra_args=["-d"]) # Extended ~ command import mock -from scapy.tools.UTscapy import Bunch def test_explore_gui(is_layer, layer): prompt_toolkit_mocked_module = Bunch( @@ -372,6 +371,7 @@ def test_explore_gui(is_layer, layer): result_explore = cmco.get_output() return result_explore +conf.interactive = True explore_dns = test_explore_gui(True, "scapy.layers.dns") assert "DNS" in explore_dns assert "DNS Question Record" in explore_dns @@ -1255,9 +1255,6 @@ raw(RandASN1Object()) * Those tests need network access -= Import retry_test() -from scapy.tools.UTscapy import retry_test - = Sending and receiving an ICMP ~ netaccess IP ICMP def _test(): diff --git a/test/usb.uts b/test/usb.uts index 42e6047e08c..cc3a8af7532 100644 --- a/test/usb.uts +++ b/test/usb.uts @@ -39,7 +39,6 @@ assert raw(pkt) == b'\x1c\x00u\x925\x00\x00\x00\x00\x00\x00\x00\x00\x005\x12\n#\ ~ mock windows import mock -from scapy.tools.UTscapy import Bunch @mock.patch("scapy.layers.usb.subprocess.Popen") def test_get_usbpcap_interfaces(mock_Popen): @@ -57,7 +56,6 @@ test_get_usbpcap_interfaces() ~ mock windows import mock -from scapy.tools.UTscapy import Bunch @mock.patch("scapy.layers.usb.subprocess.Popen") def test_get_usbpcap_devices(mock_Popen): diff --git a/test/windows.uts b/test/windows.uts index 1e0f259178c..e5a9574b1c1 100644 --- a/test/windows.uts +++ b/test/windows.uts @@ -112,8 +112,6 @@ with conf.L3socket() as a: ~ netaccess needs_root require_gui % XXX currently disabled -from scapy.tools.UTscapy import retry_test - def _test(): answer = sr1(IP(dst="8.8.8.8")/UDP()/DNS(rd=1, qd=DNSQR(qname="www.google.com")), timeout=2) answer.show() From 91a8c30fedd75375408bf29fedd01e7daddfa7f5 Mon Sep 17 00:00:00 2001 From: gpotter2 Date: Sat, 6 Apr 2019 01:25:05 +0200 Subject: [PATCH 08/10] Revert incomplete UTF8 DNS fix --- scapy/compat.py | 12 +----------- scapy/layers/dns.py | 6 ------ test/regression.uts | 8 -------- 3 files changed, 1 insertion(+), 25 deletions(-) diff --git a/scapy/compat.py b/scapy/compat.py index 94f09e3ca4e..13ffdcef745 100644 --- a/scapy/compat.py +++ b/scapy/compat.py @@ -34,18 +34,10 @@ def lambda_tuple_converter(func): if six.PY2: - plain_str = str + bytes_encode = plain_str = str chb = lambda x: x if isinstance(x, str) else chr(x) orb = ord - def bytes_encode(x): - """Ensure that the given object is bytes. - If the parameter is a packet, raw() should be preferred. - """ - if isinstance(x, six.text_type): - return x.encode('utf8') - return str(x) - def raw(x): """Builds a packet and returns its bytes representation. This function is and always be cross-version compatible""" @@ -62,8 +54,6 @@ def bytes_encode(x): """Ensure that the given object is bytes. If the parameter is a packet, raw() should be preferred. """ - # Why not merge with the Python 2 one you say ? - # because x.encode('utf8') is 30% slower if isinstance(x, str): return x.encode() return bytes(x) diff --git a/scapy/layers/dns.py b/scapy/layers/dns.py index bda5929c97c..3eb50fce00d 100755 --- a/scapy/layers/dns.py +++ b/scapy/layers/dns.py @@ -235,12 +235,6 @@ def h2i(self, pkt, x): return b"." return x - def any2i(self, pkt, x): - if isinstance(x, six.text_type): - if x and x[-1] != u".": - x += u"." - return StrField.any2i(self, pkt, x) - def i2m(self, pkt, x): return dns_encode(x, check_built=True) diff --git a/test/regression.uts b/test/regression.uts index 2b4fb347f10..1ad55fc9457 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -7257,14 +7257,6 @@ assert dns_get_str(b"\x04data\xc0\x01", 0, _fullpacket=True)[0] == b"data." s = b'\x00\x00\x84\x00\x00\x00\x00\x02\x00\x00\x00\x06\x0bGourmandise\x04_smb\x04_tcp\x05local\x00\x00!\x80\x01\x00\x00\x00x\x00\x14\x00\x00\x00\x00\x01\xbd\x0bGourmandise\xc0"\x0bGourmandise\x0b_afpovertcp\xc0\x1d\x00!\x80\x01\x00\x00\x00x\x00\x08\x00\x00\x00\x00\x02$\xc09\xc09\x00\x1c\x80\x01\x00\x00\x00x\x00\x10\xfe\x80\x00\x00\x00\x00\x00\x00\x00s#\x99\xca\xf7\xea\xdc\xc09\x00\x01\x80\x01\x00\x00\x00x\x00\x04\xc0\xa8\x01x\xc09\x00\x1c\x80\x01\x00\x00\x00x\x00\x10*\x01\xcb\x00\x0bD\x1f\x00\x18k\xb1\x99\x90\xdf\x84.\xc0\x0c\x00/\x80\x01\x00\x00\x00x\x00\t\xc0\x0c\x00\x05\x00\x00\x80\x00@\xc0G\x00/\x80\x01\x00\x00\x00x\x00\t\xc0G\x00\x05\x00\x00\x80\x00@\xc09\x00/\x80\x01\x00\x00\x00x\x00\x08\xc09\x00\x04@\x00\x00\x08' DNS(s) -= Check non-unicode hostnames for DNS encoding -~ dns -s = bytes(DNSRR(rrname=u'a\u0080b.')) -assert s == b'\x04a\xc2\x80b\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00' - -s = bytes(DNSRR(rrname=u'a\u0080')) -assert s == b'\x03a\xc2\x80\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00' - = Layer binding * Test DestMACField & DestIPField From 5cd2901481534936f3bdd63559d9e51e85534842 Mon Sep 17 00:00:00 2001 From: gpotter2 Date: Sat, 6 Apr 2019 01:25:16 +0200 Subject: [PATCH 09/10] Minor field cleanup --- scapy/fields.py | 19 ++++++++----------- scapy/tools/UTscapy.py | 2 ++ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/scapy/fields.py b/scapy/fields.py index 4ba446805d3..996b8cfd7b6 100644 --- a/scapy/fields.py +++ b/scapy/fields.py @@ -1230,26 +1230,23 @@ def i2repr(self, pkt, x): return bytes_hex(x).decode() -class XStrLenField(StrLenField): - """ - StrLenField which value is printed as hexadecimal. - """ - +class _XStrField: def i2repr(self, pkt, x): if not x: return repr(x) return bytes_hex(x[:self.length_from(pkt)]).decode() -class XStrFixedLenField(StrFixedLenField): +class XStrLenField(StrLenField, XStrField): """ - StrFixedLenField which value is printed as hexadecimal. + StrLenField which value is printed as hexadecimal. """ - def i2repr(self, pkt, x): - if not x: - return repr(x) - return bytes_hex(x[:self.length_from(pkt)]).decode() + +class XStrFixedLenField(StrFixedLenField, XStrField): + """ + StrFixedLenField which value is printed as hexadecimal. + """ class StrLenFieldUtf16(StrLenField): diff --git a/scapy/tools/UTscapy.py b/scapy/tools/UTscapy.py index e28d8d36311..b4bf0838206 100644 --- a/scapy/tools/UTscapy.py +++ b/scapy/tools/UTscapy.py @@ -438,6 +438,7 @@ def remove_empty_testsets(test_campaign): #### RUN TEST ##### def run_test(test, get_interactive_session, verb=3, ignore_globals=None, my_globals=None): + """An internal UTScapy function to run a single test""" test.output, res = get_interactive_session(test.test.strip(), ignore_globals=ignore_globals, verb=verb, my_globals=my_globals) test.result = "failed" try: @@ -466,6 +467,7 @@ def run_test(test, get_interactive_session, verb=3, ignore_globals=None, my_glob #### RUN CAMPAIGN ##### def import_UTscapy_tools(ses): + """Adds UTScapy tools directly to a session""" ses["retry_test"] = retry_test ses["Bunch"] = Bunch From 7eb77e8c5f00a2961524f96a92e48556ae678557 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 18 Apr 2019 20:17:31 +0200 Subject: [PATCH 10/10] Fix _XStrLenField calls --- scapy/fields.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scapy/fields.py b/scapy/fields.py index 996b8cfd7b6..54fac043265 100644 --- a/scapy/fields.py +++ b/scapy/fields.py @@ -1230,20 +1230,20 @@ def i2repr(self, pkt, x): return bytes_hex(x).decode() -class _XStrField: +class _XStrLenField: def i2repr(self, pkt, x): if not x: return repr(x) return bytes_hex(x[:self.length_from(pkt)]).decode() -class XStrLenField(StrLenField, XStrField): +class XStrLenField(_XStrLenField, StrLenField): """ StrLenField which value is printed as hexadecimal. """ -class XStrFixedLenField(StrFixedLenField, XStrField): +class XStrFixedLenField(_XStrLenField, StrFixedLenField): """ StrFixedLenField which value is printed as hexadecimal. """