diff --git a/scapy/fields.py b/scapy/fields.py index 78b0ff71660..54fac043265 100644 --- a/scapy/fields.py +++ b/scapy/fields.py @@ -919,8 +919,8 @@ def __init__(self, name, default, fmt="H", remain=0): Field.__init__(self, name, default, fmt) self.remain = remain - def i2len(self, pkt, i): - return len(i) + def i2len(self, pkt, x): + return len(x) def any2i(self, pkt, x): if isinstance(x, six.text_type): @@ -1230,26 +1230,23 @@ def i2repr(self, pkt, x): return bytes_hex(x).decode() -class XStrLenField(StrLenField): - """ - StrLenField which value is printed as hexadecimal. - """ - +class _XStrLenField: def i2repr(self, pkt, x): if not x: return repr(x) return bytes_hex(x[:self.length_from(pkt)]).decode() -class XStrFixedLenField(StrFixedLenField): +class XStrLenField(_XStrLenField, StrLenField): """ - StrFixedLenField which value is printed as hexadecimal. + StrLenField which value is printed as hexadecimal. """ - def i2repr(self, pkt, x): - if not x: - return repr(x) - return bytes_hex(x[:self.length_from(pkt)]).decode() + +class XStrFixedLenField(_XStrLenField, StrFixedLenField): + """ + StrFixedLenField which value is printed as hexadecimal. + """ class StrLenFieldUtf16(StrLenField): diff --git a/scapy/layers/dns.py b/scapy/layers/dns.py index a06786511c8..3eb50fce00d 100755 --- a/scapy/layers/dns.py +++ b/scapy/layers/dns.py @@ -8,25 +8,23 @@ """ from __future__ import absolute_import -import socket import struct import time from scapy.config import conf from scapy.packet import Packet, bind_layers, NoPayload from scapy.fields import BitEnumField, BitField, ByteEnumField, ByteField, \ - ConditionalField, Field, FieldLenField, FlagsField, IntField, \ + ConditionalField, FieldLenField, FlagsField, IntField, \ PacketListField, ShortEnumField, ShortField, StrField, StrFixedLenField, \ - StrLenField + StrLenField, MultipleTypeField, UTCTimeField from scapy.compat import orb, raw, chb, bytes_encode from scapy.ansmachine import AnsweringMachine from scapy.sendrecv import sr1 -from scapy.layers.inet import IP, DestIPField, UDP, TCP -from scapy.layers.inet6 import DestIP6Field +from scapy.layers.inet import IP, DestIPField, IPField, UDP, TCP +from scapy.layers.inet6 import DestIP6Field, IP6Field from scapy.error import warning, Scapy_Exception import scapy.modules.six as six from scapy.modules.six.moves import range -from scapy.pton_ntop import inet_ntop, inet_pton def dns_get_str(s, pointer=0, pkt=None, _fullpacket=False): @@ -104,6 +102,29 @@ def dns_get_str(s, pointer=0, pkt=None, _fullpacket=False): return name, pointer, bytes_left +def dns_encode(x, check_built=False): + """Encodes a bytes string into the DNS format + + :param x: the string + :param check_built: detect already-built strings and ignore them + :returns: the encoded bytes string + """ + if not x or x == b".": + return b"\x00" + + if check_built and b"." not in x and ( + orb(x[-1]) == 0 or (orb(x[-2]) & 0xc0) == 0xc0 + ): + # The value has already been processed. Do not process it again + return x + + # Truncate chunks that cannot be encoded (more than 63 bytes..) + x = b"".join(chb(len(y)) + y for y in (k[:63] for k in x.split(b"."))) + if x[-1:] != b"\x00": + x += b"\x00" + return x + + def DNSgetstr(*args, **kwargs): """Legacy function. Deprecated""" raise DeprecationWarning("DNSgetstr deprecated. Use dns_get_str instead") @@ -129,7 +150,7 @@ def field_gen(dns_pkt): if isinstance(current, InheritOriginDNSStrPacket): for field in current.fields_desc: if isinstance(field, DNSStrField) or \ - (isinstance(field, RDataField) and + (isinstance(field, MultipleTypeField) and current.type in [2, 5, 12]): # Get the associated data and store it accordingly # noqa: E501 dat = current.getfieldval(field.name) @@ -143,11 +164,10 @@ def possible_shortens(dat): yield dat.split(b".", x)[x] data = {} burned_data = 0 - dummy_dns = DNSStrField("", "") # Used for its i2m method for current, name, dat in field_gen(dns_pkt): for part in possible_shortens(dat): # Encode the data - encoded = dummy_dns.i2m(None, part) + encoded = dns_encode(part, check_built=True) if part not in data: # We have no occurrence of such data, let's store it as a # possible pointer for future strings. @@ -179,7 +199,7 @@ def possible_shortens(dat): # setfieldval edits the value of the field in the layer val = rep[0].getfieldval(rep[1]) assert val.endswith(ck) - kept_string = dummy_dns.i2m(None, val[:-len(ck)])[:-1] + kept_string = dns_encode(val[:-len(ck)], check_built=True)[:-1] new_val = kept_string + replace_pointer rep[0].setfieldval(rep[1], new_val) try: @@ -203,30 +223,32 @@ def __init__(self, _pkt=None, _orig_s=None, _orig_p=None, *args, **kwargs): Packet.__init__(self, _pkt=_pkt, *args, **kwargs) -class DNSStrField(StrField): +class DNSStrField(StrLenField): + """ + Special StrField that handles DNS encoding/decoding. + It will also handle DNS decompression. + (may be StrLenField if a length_from is passed), + """ + def h2i(self, pkt, x): if not x: return b"." return x def i2m(self, pkt, x): - if any((orb(y) >= 0xc0) for y in x): - # The value has already been processed. Do not process it again - return x + return dns_encode(x, check_built=True) - if not x or x == b".": - return b"\x00" - - # Truncate chunks that cannot be encoded (more than 63 bytes..) - x = b"".join(chb(len(y)) + y for y in (k[:63] for k in x.split(b"."))) - if orb(x[-1]) != 0 and (orb(x[-2]) < 0xc0): - x += b"\x00" - return x + def i2len(self, pkt, x): + return len(self.i2m(pkt, x)) def getfield(self, pkt, s): + remain = b"" + if self.length_from: + remain, s = StrLenField.getfield(self, pkt, s) # Decode the compressed DNS message - decoded, index, left = dns_get_str(s, 0, pkt) - return left, decoded + decoded, _, left = dns_get_str(s, 0, pkt) + # returns (remaining, decoded) + return left + remain, decoded class DNSRRCountField(ShortField): @@ -271,16 +293,13 @@ def i2m(self, pkt, x): def decodeRR(self, name, s, p): ret = s[p:p + 10] - type, cls, ttl, rdlen = struct.unpack("!HHIH", ret) + # type, cls, ttl, rdlen + typ, cls, _, rdlen = struct.unpack("!HHIH", ret) p += 10 - rr = DNSRR(b"\x00" + ret + s[p:p + rdlen], _orig_s=s, _orig_p=p) - if type in [2, 3, 4, 5]: - rr.rdata = dns_get_str(s, p, _fullpacket=True)[0] - del(rr.rdlen) - elif type in DNSRR_DISPATCHER: - rr = DNSRR_DISPATCHER[type](b"\x00" + ret + s[p:p + rdlen], _orig_s=s, _orig_p=p) # noqa: E501 - else: - del(rr.rdlen) + cls = DNSRR_DISPATCHER.get(typ, DNSRR) + rr = cls(b"\x00" + ret + s[p:p + rdlen], _orig_s=s, _orig_p=p) + # Will have changed because of decompression + rr.rdlen = None rr.rrname = name p += rdlen @@ -319,73 +338,42 @@ def decodeRR(self, name, s, p): return rr, p -class RDataField(StrLenField): +class DNSTextField(StrLenField): + """ + Special StrLenField that handles DNS TEXT data (16) + """ + islist = 1 def m2i(self, pkt, s): - family = None - if pkt.type == 1: # A - family = socket.AF_INET - elif pkt.type in [2, 5, 12]: # NS, CNAME, PTR - s = dns_get_str(s, 0, pkt)[0] - elif pkt.type == 16: # TXT - ret_s = list() - tmp_s = s - # RDATA contains a list of strings, each are prepended with - # a byte containing the size of the following string. - while tmp_s: - tmp_len = orb(tmp_s[0]) + 1 - if tmp_len > len(tmp_s): - warning("DNS RR TXT prematured end of character-string (size=%i, remaining bytes=%i)" % (tmp_len, len(tmp_s))) # noqa: E501 - ret_s.append(tmp_s[1:tmp_len]) - tmp_s = tmp_s[tmp_len:] - s = ret_s - elif pkt.type == 28: # AAAA - family = socket.AF_INET6 - if family is not None: - s = inet_ntop(family, s) - return s + ret_s = list() + tmp_s = s + # RDATA contains a list of strings, each are prepended with + # a byte containing the size of the following string. + while tmp_s: + tmp_len = orb(tmp_s[0]) + 1 + if tmp_len > len(tmp_s): + warning("DNS RR TXT prematured end of character-string (size=%i, remaining bytes=%i)" % (tmp_len, len(tmp_s))) # noqa: E501 + ret_s.append(tmp_s[1:tmp_len]) + tmp_s = tmp_s[tmp_len:] + return ret_s + + def i2len(self, pkt, x): + return len(self.i2m(pkt, x)) def i2m(self, pkt, s): - if pkt.type == 1: # A - if s: - s = inet_pton(socket.AF_INET, s) - elif pkt.type in [2, 3, 4, 5, 12]: # NS, MD, MF, CNAME, PTR - s = DNSStrField("", "").i2m(None, s) - elif pkt.type == 16: # TXT - ret_s = b"" - for text in s: - text = bytes_encode(text) - # The initial string must be split into a list of strings - # prepended with theirs sizes. - while len(text) >= 255: - ret_s += b"\xff" + text[:255] - text = text[255:] - # The remaining string is less than 255 bytes long - if len(text): - ret_s += struct.pack("!B", len(text)) + text - s = ret_s - elif pkt.type == 28: # AAAA - if s: - s = inet_pton(socket.AF_INET6, s) - return s - - -class RDLenField(Field): - def __init__(self, name): - Field.__init__(self, name, None, "H") - - def i2m(self, pkt, x): - if x is None: - rdataf = pkt.get_field("rdata") - x = len(rdataf.i2m(pkt, pkt.rdata)) - return x - - def i2h(self, pkt, x): - if x is None: - rdataf = pkt.get_field("rdata") - x = len(rdataf.i2m(pkt, pkt.rdata)) - return x + ret_s = b"" + for text in s: + text = bytes_encode(text) + # The initial string must be split into a list of strings + # prepended with theirs sizes. + while len(text) >= 255: + ret_s += b"\xff" + text[:255] + text = text[255:] + # The remaining string is less than 255 bytes long + if len(text): + ret_s += struct.pack("!B", len(text)) + text + return ret_s class DNS(Packet): @@ -519,23 +507,6 @@ class DNSRROPT(InheritOriginDNSStrPacket): dnssecdigesttypes = {0: "Reserved", 1: "SHA-1", 2: "SHA-256", 3: "GOST R 34.11-94", 4: "SHA-384"} # noqa: E501 -class TimeField(IntField): - - def any2i(self, pkt, x): - if isinstance(x, str): - import time - import calendar - t = time.strptime(x, "%Y%m%d%H%M%S") - return int(calendar.timegm(t)) - return x - - def i2repr(self, pkt, x): - import time - x = self.i2h(pkt, x) - t = time.strftime("%Y%m%d%H%M%S", time.gmtime(x)) - return "%s (%d)" % (t, x) - - def bitmap2RRlist(bitmap): """ Decode the 'Type Bit Maps' field of the NSEC Resource Record into an @@ -683,8 +654,8 @@ class DNSRRRSIG(_DNSRRdummy): ByteEnumField("algorithm", 5, dnssecalgotypes), ByteField("labels", 0), IntField("originalttl", 0), - TimeField("expiration", 0), - TimeField("inception", 0), + UTCTimeField("expiration", 0), + UTCTimeField("inception", 0), ShortField("keytag", 0), DNSStrField("signersname", ""), StrField("signature", "") @@ -882,8 +853,27 @@ class DNSRR(InheritOriginDNSStrPacket): ShortEnumField("type", 1, dnstypes), ShortEnumField("rclass", 1, dnsclasses), IntField("ttl", 0), - RDLenField("rdlen"), - RDataField("rdata", "", length_from=lambda pkt:pkt.rdlen)] + FieldLenField("rdlen", None, length_of="rdata", fmt="H"), + MultipleTypeField( + [ + # A + (IPField("rdata", "0.0.0.0"), + lambda pkt: pkt.type == 1), + # AAAA + (IP6Field("rdata", "::"), + lambda pkt: pkt.type == 28), + # NS, MD, MF, CNAME, PTR + (DNSStrField("rdata", "", + length_from=lambda pkt: pkt.rdlen), + lambda pkt: pkt.type in [2, 3, 4, 5, 12]), + # TEXT + (DNSTextField("rdata", [], + length_from=lambda pkt: pkt.rdlen), + lambda pkt: pkt.type == 16), + ], + StrLenField("rdata", "", + length_from=lambda pkt:pkt.rdlen) + )] bind_layers(UDP, DNS, dport=5353) diff --git a/scapy/tools/UTscapy.py b/scapy/tools/UTscapy.py index 5cd3b13d15d..b4bf0838206 100644 --- a/scapy/tools/UTscapy.py +++ b/scapy/tools/UTscapy.py @@ -162,8 +162,9 @@ def __getitem__(self, item): def add_keywords(self, kws): if isinstance(kws, six.string_types): - kws = [kws] + kws = [kws.lower()] for kwd in kws: + kwd = kwd.lower() if kwd.startswith('-'): try: self.keywords.remove(kwd[1:]) @@ -408,28 +409,26 @@ def filter_tests_on_numbers(test_campaign, num): if ts.tests] -def filter_tests_keep_on_keywords(test_campaign, kw): +def _filter_tests_kw(test_campaign, kw, keep): def kw_match(lst, kw): - for k in lst: - if k in kw: - return True - return False + return any(k for k in lst if kw == k) if kw: + kw = kw.lower() + if keep: + cond = lambda x: x + else: + cond = lambda x: not x for ts in test_campaign: - ts.tests = [t for t in ts.tests if kw_match(t.keywords, kw)] + ts.tests = [t for t in ts.tests if cond(kw_match(t.keywords, kw))] -def filter_tests_remove_on_keywords(test_campaign, kw): - def kw_match(lst, kw): - for k in kw: - if k in lst: - return True - return False +def filter_tests_keep_on_keywords(test_campaign, kw): + return _filter_tests_kw(test_campaign, kw, True) - if kw: - for ts in test_campaign: - ts.tests = [t for t in ts.tests if not kw_match(t.keywords, kw)] + +def filter_tests_remove_on_keywords(test_campaign, kw): + return _filter_tests_kw(test_campaign, kw, False) def remove_empty_testsets(test_campaign): @@ -438,8 +437,9 @@ def remove_empty_testsets(test_campaign): #### RUN TEST ##### -def run_test(test, get_interactive_session, verb=3, ignore_globals=None): - test.output, res = get_interactive_session(test.test.strip(), ignore_globals=ignore_globals, verb=verb) +def run_test(test, get_interactive_session, verb=3, ignore_globals=None, my_globals=None): + """An internal UTScapy function to run a single test""" + test.output, res = get_interactive_session(test.test.strip(), ignore_globals=ignore_globals, verb=verb, my_globals=my_globals) test.result = "failed" try: if res is None or res: @@ -466,14 +466,21 @@ def run_test(test, get_interactive_session, verb=3, ignore_globals=None): #### RUN CAMPAIGN ##### +def import_UTscapy_tools(ses): + """Adds UTScapy tools directly to a session""" + ses["retry_test"] = retry_test + ses["Bunch"] = Bunch + def run_campaign(test_campaign, get_interactive_session, verb=3, ignore_globals=None): # noqa: E501 passed = failed = 0 + scapy_ses = importlib.import_module(".all", "scapy").__dict__ + import_UTscapy_tools(scapy_ses) if test_campaign.preexec: - test_campaign.preexec_output = get_interactive_session(test_campaign.preexec.strip(), ignore_globals=ignore_globals)[0] + test_campaign.preexec_output = get_interactive_session(test_campaign.preexec.strip(), ignore_globals=ignore_globals, my_globals=scapy_ses)[0] try: for i, testset in enumerate(test_campaign): for j, t in enumerate(testset): - if run_test(t, get_interactive_session, verb): + if run_test(t, get_interactive_session, verb, my_globals=scapy_ses): passed += 1 else: failed += 1 @@ -850,8 +857,8 @@ def main(): LOCAL = 1 if data.local else 0 NUM = data.num MODULES = data.modules - KW_OK = [data.kw_ok] - KW_KO = [data.kw_ko] + KW_OK.extend(data.kw_ok) + KW_KO.extend(data.kw_ko) try: FORMAT = Format.from_string(data.format) except KeyError as msg: @@ -876,13 +883,13 @@ def main(): elif opt == "-m": MODULES.append(optarg) elif opt == "-k": - KW_OK.append(optarg.split(",")) + KW_OK.extend(optarg.split(",")) elif opt == "-K": - KW_KO.append(optarg.split(",")) + KW_KO.extend(optarg.split(",")) # Discard Python3 tests when using Python2 if six.PY2: - KW_KO.append(["python3_only"]) + KW_KO.append("python3_only") if VERB > 2: print("### Booting scapy...", file=sys.stderr) diff --git a/test/answering_machines.uts b/test/answering_machines.uts index 67eaf54dc96..bdc69e47f77 100644 --- a/test/answering_machines.uts +++ b/test/answering_machines.uts @@ -54,7 +54,7 @@ test_am(ARP_am, = DNS_am def check_DNS_am_reply(packet): assert(DNS in packet and packet[DNS].ancount == 1) - assert(packet[DNS].an.rdata == b"192.168.1.1") + assert(packet[DNS].an.rdata == "192.168.1.1") test_am(DNS_am, IP()/UDP()/DNS(qd=DNSQR(qname="www.secdev.org")), diff --git a/test/edns0.uts b/test/edns0.uts index 41399694f46..33907f53e8d 100644 --- a/test/edns0.uts +++ b/test/edns0.uts @@ -63,7 +63,6 @@ raw(tlv) == b'\x00\x02\x00\x00' = NSID - Live test ~ netaccess -from scapy.tools.UTscapy import retry_test def _test(): old_debug_dissector = conf.debug_dissector conf.debug_dissector = False @@ -71,4 +70,4 @@ def _test(): conf.debug_dissector = old_debug_dissector len(r.ar) and DNSRROPT in r.ar and len(r.ar[DNSRROPT].rdata) and len([x for x in r.ar[DNSRROPT].rdata if x.optcode == 3]) -retry_test(_test) \ No newline at end of file +retry_test(_test) diff --git a/test/pipetool.uts b/test/pipetool.uts index 2b1f526c5f3..7db32f82f90 100644 --- a/test/pipetool.uts +++ b/test/pipetool.uts @@ -4,9 +4,6 @@ + Basic tests -= Import Bunch -from scapy.tools.UTscapy import Bunch - = Test default test case s = PeriodicSource("hello", 1, name="src") diff --git a/test/regression.uts b/test/regression.uts index affde2a4cca..1ad55fc9457 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -346,7 +346,6 @@ interact_emulator(extra_args=["-d"]) # Extended ~ command import mock -from scapy.tools.UTscapy import Bunch def test_explore_gui(is_layer, layer): prompt_toolkit_mocked_module = Bunch( @@ -372,6 +371,7 @@ def test_explore_gui(is_layer, layer): result_explore = cmco.get_output() return result_explore +conf.interactive = True explore_dns = test_explore_gui(True, "scapy.layers.dns") assert "DNS" in explore_dns assert "DNS Question Record" in explore_dns @@ -1255,9 +1255,6 @@ raw(RandASN1Object()) * Those tests need network access -= Import retry_test() -from scapy.tools.UTscapy import retry_test - = Sending and receiving an ICMP ~ netaccess IP ICMP def _test(): diff --git a/test/usb.uts b/test/usb.uts index 42e6047e08c..cc3a8af7532 100644 --- a/test/usb.uts +++ b/test/usb.uts @@ -39,7 +39,6 @@ assert raw(pkt) == b'\x1c\x00u\x925\x00\x00\x00\x00\x00\x00\x00\x00\x005\x12\n#\ ~ mock windows import mock -from scapy.tools.UTscapy import Bunch @mock.patch("scapy.layers.usb.subprocess.Popen") def test_get_usbpcap_interfaces(mock_Popen): @@ -57,7 +56,6 @@ test_get_usbpcap_interfaces() ~ mock windows import mock -from scapy.tools.UTscapy import Bunch @mock.patch("scapy.layers.usb.subprocess.Popen") def test_get_usbpcap_devices(mock_Popen): diff --git a/test/windows.uts b/test/windows.uts index 1e0f259178c..e5a9574b1c1 100644 --- a/test/windows.uts +++ b/test/windows.uts @@ -112,8 +112,6 @@ with conf.L3socket() as a: ~ netaccess needs_root require_gui % XXX currently disabled -from scapy.tools.UTscapy import retry_test - def _test(): answer = sr1(IP(dst="8.8.8.8")/UDP()/DNS(rd=1, qd=DNSQR(qname="www.google.com")), timeout=2) answer.show()