diff --git a/scapy/all.py b/scapy/all.py index 1c67e8b06ad..22b484ee6e5 100644 --- a/scapy/all.py +++ b/scapy/all.py @@ -43,6 +43,10 @@ from scapy.asn1.ber import * from scapy.asn1.mib import * +from scapy.cbor import * +from scapy.cborfields import * +from scapy.cborpacket import * + from scapy.pipetool import * from scapy.scapypipes import * diff --git a/scapy/cbor.py b/scapy/cbor.py new file mode 100644 index 00000000000..1f6dcc3ba95 --- /dev/null +++ b/scapy/cbor.py @@ -0,0 +1,508 @@ +# SPDX-License-Identifier: GPL-2.0-only +# This file is part of Scapy +# See https://scapy.net/ for more information +# Copyright (C) Brian Sipos + +""" +Classes that implement CBOR leaf data structures. +The current API is focused on definite-length strings and containers, but +does support indefinite-length variations. +""" + +from dataclasses import dataclass, field +import enum +import itertools +import struct +from typing import Dict, List, Optional, Tuple, Union + + +@enum.unique +class CborMajorType(enum.IntEnum): + ''' Major types defined in Section 3.1 of RFC 8949. ''' + + UINT = 0 + ''' Unsigned integer ''' + NINT = 1 + ''' Negative integer ''' + BSTR = 2 + ''' Byte string ''' + TSTR = 3 + ''' Text string ''' + ARRAY = 4 + ''' Array of items ''' + MAP = 5 + ''' Map from item to item ''' + TAG = 6 + ''' Global tag value ''' + OTHERS = 7 + ''' Floats and simple values ''' + + +@enum.unique +class CborSimpleValue(enum.IntEnum): + ''' The defined argument values smaller than 256 for CborMajorType.OTHERS ''' + FALSE = 20 + TRUE = 21 + NULL = 22 + UNDEFINED = 23 + + +CBOR_INDEF_BREAK = b'\xFF' +''' Encoded form of the indefinite break item. ''' + + +@dataclass(frozen=True) +class CborHead: + ''' The contents of a single CBOR head, which excludes the + contents of string types or item containers. + ''' + major: CborMajorType + ''' The major type from the first three bits of the head. ''' + argument: Union[int, bytes, None] + ''' The full argument value (unsigned integer), encoded float bytes, + or None to indicate additional info 31. + ''' + + +CBOR_HEAD_UNDEFINED = CborHead( + major=CborMajorType.OTHERS, + argument=CborSimpleValue.UNDEFINED.value +) +''' Static definition of the default undefined value. ''' + + +def cbor_encode_head(head: CborHead) -> bytearray: + ''' Encode a single CBOR head (without content). ''' + # mutable initial byte with major type + data = bytearray([int(head.major) << 5]) + + if isinstance(head.argument, int): + # normal unsigned arguments + if head.argument < 24: + addl = head.argument + arglen = 0 + elif head.argument < 2**8: + addl = 24 + arglen = 1 + elif head.argument < 2**16: + addl = 25 + arglen = 2 + elif head.argument < 2**32: + addl = 26 + arglen = 4 + elif head.argument < 2**64: + addl = 27 + arglen = 8 + else: + raise ValueError(f'invalid argument {head.argument}') + + if arglen: + data += head.argument.to_bytes(arglen, 'big') + + elif isinstance(head.argument, bytes): + # encoded floats + arglen = len(head.argument) + if arglen == 2: + addl = 25 + elif arglen == 4: + addl = 26 + elif arglen == 8: + addl = 27 + else: + raise ValueError(f'invalid argument {head.argument}') + + data += head.argument + + elif head.argument is None: + addl = 31 + + else: + raise ValueError(f'invalid argument {head.argument}') + + # back-write additional info bits + data[0] |= addl + + return data + + +def cbor_decode_head(data: bytearray) -> Tuple[int, Optional[CborHead]]: + ''' Decode a single CBOR head (without content). + + :param data: The data to read and slice off the used portion. + :return: A tuple of: the total size used and the decoded head object. + ''' + try: + init = int(data.pop(0)) + except (IndexError, TypeError): + return 0, None + used = 1 + major = CborMajorType(init >> 5) + addl = init & 0x1F + + if addl < 24: + arg = addl + elif 24 <= addl <= 27: + if addl == 24: + arglen = 1 + elif addl == 25: + arglen = 2 + elif addl == 26: + arglen = 4 + elif addl == 27: + arglen = 8 + + if major == CborMajorType.OTHERS: + # for encoded floats + arg = bytes(data[:arglen]) + else: + arg = int.from_bytes(data[:arglen], 'big') + + used += arglen + del data[:arglen] + elif 28 <= addl <= 30: + raise ValueError('Not well defined CBOR') + else: + # addl value 31 + arg = None + + head = CborHead( + major=major, + argument=arg + ) + return (used, head) + + +@dataclass(frozen=True) +class CborChunk: + ''' The direct attributes of a CBOR head along with any tags on that item. + This also contains a decoded semantically meaningful :py:attr:`content` + interpreted according to the following uses. + + Major types UINT and NINT have content of :py:cls:`int` representing + the decoded value. For UINT the content is identical to the head + :py:attr:`CborHead.argument`, for NINT the content is the actual negative + integer value. + + Major types BSTR and TSTR have content of :py:cls:`bytes` or :py:cls:`str` when + decoding is performed recursively (TSTR can use pre-utf8-encoded bytes). + + Major types ARRAY and MAP have content of child :py:cls:`CborChunk` objects + when decoding is performed recursively. + + Major type OTHERS has content of :py:cls:`CborSimpleValue` for specific + enumerated simple values or :py:cls:`float` for floating point values. + ''' + head: CborHead + ''' A non-tag head value for this item. ''' + tags: Tuple[int] = field(default_factory=tuple) + ''' Ordered list of tags from outer to inner for this item. ''' + content: Union[None, + int, bytes, str, CborSimpleValue, float, + List['CborChunk']] = None + ''' Optional semantic content beyond the argument values. ''' + + def is_break(self) -> bool: + ''' Identify a break item for any indefinite-length container. ''' + return self.head.major == CborMajorType.OTHERS and self.head.argument is None + + def __str__(self) -> str: + ''' Provide human-friendly representation inspired by + CBOR Extended Diagnostic Notation (EDN). + ''' + return 'CBOR({})'.format(self._diag()) + + def _diag(self) -> str: + ''' Internal recursive diagnostic notation for __str__. ''' + val = '' + match self.head.major: + case CborMajorType.UINT | CborMajorType.NINT: + val = "{}".format(self.content) + case CborMajorType.BSTR: + val = "h'{}'".format(self.content.hex()) + case CborMajorType.TSTR: + cnt = self.content + if isinstance(cnt, bytes): + cnt = cnt.decode('utf8') + val = '"{}"'.format(cnt) + case CborMajorType.ARRAY: + if self.content is not None: + val = '[{}]'.format(','.join(sub._diag() for sub in self.content)) + elif self.head.argument is None: + val = '[_' + else: + val = '[' + case CborMajorType.OTHERS: + match self.content: + case CborSimpleValue.FALSE: + val = 'false' + case CborSimpleValue.TRUE: + val = 'true' + case CborSimpleValue.NULL: + val = 'null' + case CborSimpleValue.UNDEFINED: + val = 'undefined' + if isinstance(self.content, float): + val = '{:e}'.format(self.content) + + return val + + +def cbor_chunk_int(val: int) -> CborChunk: + ''' Construct a consistent integer (possibly negative) value. ''' + if val >= 0: + major = CborMajorType.UINT + arg = val + else: + major = CborMajorType.NINT + arg = -1 - val + + return CborChunk( + head=CborHead(major=major, argument=arg), + content=val + ) + + +def cbor_chunk_tstr(val: str) -> CborChunk: + ''' Pre-encode and construct a consistent definite-length text string. ''' + data = val.encode('utf8') + return CborChunk( + head=CborHead(CborMajorType.TSTR, len(data)), + content=data + ) + + +def cbor_chunk_bstr(val: bytes) -> CborChunk: + ''' Construct a consistent definite-length byte string. ''' + val = bytes(val) + return CborChunk( + head=CborHead(CborMajorType.BSTR, len(val)), + content=val + ) + + +def cbor_chunk_array(val: List[CborChunk]) -> CborChunk: + ''' Construct a consistent definite-length array. ''' + return CborChunk( + head=CborHead(CborMajorType.ARRAY, len(val)), + content=list(val) + ) + + +def cbor_chunk_map(val: Dict[CborChunk, CborChunk]) -> CborChunk: + ''' Construct a consistent definite-length map. ''' + return CborChunk( + head=CborHead(CborMajorType.MAP, len(val)), + content=[ + item for pair in val.items() for item in pair + ] + ) + + +def cbor_chunk_simple(val: CborSimpleValue) -> CborChunk: + ''' Construct a consistent simple value. ''' + return CborChunk( + head=CborHead(major=CborMajorType.OTHERS, argument=val.value), + content=CborSimpleValue(val) + ) + + +def cbor_chunk_float(val: float) -> CborChunk: + ''' Construct a consistent floating point value. ''' + arg = struct.pack('!d', val) + + return CborChunk( + head=CborHead(major=CborMajorType.OTHERS, argument=arg), + content=float(val) + ) + + +def cbor_chunk_indef(major: CborMajorType, content=None) -> CborChunk: + ''' Construct an indefinite-length start item. + + :param major: The major type enum. + :param content: Optional content of the container. + If not None, this will have a break item appended to it. + :return: The chunk object. + ''' + if content is not None: + content = tuple(content) + (cbor_chunk_break(),) + + return CborChunk( + head=CborHead(major=major, argument=None), + content=content + ) + + +def cbor_chunk_break() -> CborChunk: + ''' Construct an indefinite-length break item. ''' + return CborChunk( + head=CborHead(major=CborMajorType.OTHERS, argument=None) + ) + + +def cbor_encode_chunk(chunk: CborChunk) -> bytearray: + ''' Encode a chunk without recursion. + + :param chunk: The chunk to encode. + :return: The serialized form of data. + ''' + content = chunk.content + if isinstance(content, (int, CborSimpleValue, float)): + # not needed here + content = None + elif isinstance(content, str): + # pre-encode text to get byte length + content = content.encode('utf8') + + # use of argument for length must already be set before this + buf = cbor_encode_head(chunk.head) + + # all other types than these are not encoded content but internal semantic use + if isinstance(content, bytes): + buf += content + elif isinstance(content, (list, tuple)): + # recurse where possible + for sub in content: + buf += cbor_encode_chunk(sub) + + return buf + + +def cbor_decode_sequence(items: List[CborChunk], data: bytearray, + count: Union[int, None, False], + must_major: Optional[CborMajorType] = None) -> int: + ''' Decode a CBOR sequence recursively. + + :param items: The list to append to. + For an indefinite-length container, this will contain the break item. + :param data: The data to read and slice off of. + :param count: The number of items (top chunks) to decode or + None to iterate until the CBOR break item is seen or + False to iterate until the data is all read. + :param must_major: If not None, the required major type for all items. + :return: The total size of bytes read. + ''' + if count is None or count is False: + repeat = itertools.repeat(None) + else: + repeat = range(count) + + allused = 0 + for _ix in repeat: + used, chunk = cbor_decode_chunk(data, recurse=True) + if chunk is None: + if count is False: + break + else: + raise ValueError('Not enough items available') + + got_break = chunk.is_break() + if got_break: + if count is not None: + raise ValueError('Got break item in definite-length sequence') + else: + if must_major is not None and chunk.head.major != must_major: + raise ValueError(f'Require major type {must_major}' + f' got {chunk.head.major}') + + allused += used + items.append(chunk) + # include the break item in the list + if got_break: + break + + return allused + + +def cbor_decode_chunk(data: bytearray, + recurse: bool = False) -> Tuple[int, Optional[CborChunk]]: + ''' Decode a chunk by iterating through all tags until another + major type is seen. + + :param data: The data to read and slice off the used portion. + :param recurse: If true, recurse into the :py:attr:`CborChunk.content` + after the head value. + :return: A tuple of: the size slided off and the chunk which was read in. + ''' + tags = [] + while True: + used, head = cbor_decode_head(data) + if head is None: + return 0, None + + if head.major == CborMajorType.TAG: + tags.append(head.argument) + else: + break + + # Handle content when requested + cnt = None + match head.major: + case CborMajorType.UINT: + cnt = head.argument + case CborMajorType.NINT: + cnt = -1 - head.argument + + case CborMajorType.BSTR: + if recurse: + if head.argument is None: + cnt = [] + used += cbor_decode_sequence(cnt, data, None, head.major) + else: + cnt = data[:head.argument] + del data[:head.argument] + used += head.argument + + case CborMajorType.TSTR: + if recurse: + if head.argument is None: + cnt = [] + used += cbor_decode_sequence(cnt, data, None, head.major) + else: + cnt = data[:head.argument].decode('utf8') + del data[:head.argument] + used += head.argument + + case CborMajorType.ARRAY: + if recurse: + cnt = [] + used += cbor_decode_sequence(cnt, data, head.argument) + + case CborMajorType.MAP: + if recurse: + # the map size is number of pairs, not items + count = 2 * head.argument if head.argument is not None else None + + tmp = [] + used += cbor_decode_sequence(tmp, data, count) + + tmpit = iter(tmp) + cnt = {} + for key, sval in zip(tmpit, tmpit): + cnt[key] = sval + + case CborMajorType.OTHERS: + if isinstance(head.argument, int): + cnt = CborSimpleValue(head.argument) + + elif isinstance(head.argument, bytes): + # float values decoded from the raw data + arglen = len(head.argument) + if arglen == 2: + fmt = 'e' + elif arglen == 4: + fmt = 'f' + elif arglen == 8: + fmt = 'd' + else: + raise ValueError(f'invalid float length {arglen}') + cnt = struct.unpack('!' + fmt, head.argument)[0] + + elif head.argument is None: + cnt = None + + else: + raise ValueError(f'invalid other argument type {type(head.argument)}') + + chunk = CborChunk(tags=tuple(tags), head=head, content=cnt) + return used, chunk diff --git a/scapy/cborfields.py b/scapy/cborfields.py new file mode 100644 index 00000000000..afbc1755ec0 --- /dev/null +++ b/scapy/cborfields.py @@ -0,0 +1,306 @@ +# SPDX-License-Identifier: GPL-2.0-only +# This file is part of Scapy +# See https://scapy.net/ for more information +# Copyright (C) Brian Sipos + +""" +Classes that implement CBOR leaf data structures. +""" + +from typing import Any, Optional, List, Tuple +from .fields import I, M, Field, AnyField +from .packet import Packet +from .volatile import RandNum, RandBin +from .cbor import ( + CborMajorType, CborSimpleValue, CborHead, CborChunk, + cbor_chunk_int, cbor_chunk_bstr, cbor_encode_chunk, cbor_decode_chunk +) + + +class CborItemBase: + ''' Mixin class to decode and encode CBOR items into a CBOR packet. ''' + + decode_recurse = False + ''' If true, the chunk decoding is recursive. ''' + + def addfield(self, pkt, s, val): + # type: (Packet, bytes, Optional[I]) -> bytes + mval = self.i2m(pkt, val) + if mval is None: + return s + elif isinstance(mval, CborChunk): + data = cbor_encode_chunk(mval) + self._inc_seen(pkt) + return s + bytes(data) + else: + return s + mval + + def getfield(self, pkt, s): + # type: (Packet, bytes) -> Tuple[bytes, I] + buf = bytearray(s) + _used, chunk = cbor_decode_chunk(buf, recurse=self.decode_recurse) + self._inc_seen(pkt) + return bytes(buf), self.m2i(pkt, chunk) + + def _inc_seen(self, pkt): + ''' Increment the seen item counter on a CBOR packet. ''' + if hasattr(pkt, 'array_seen_items'): + pkt.array_seen_items += 1 + + +class CborAnyField(CborItemBase, Field[CborChunk, CborChunk]): + ''' Special case to handle sequences of chunks recursively. ''' + + decode_recurse = True + + def __init__(self, name, default=None): + # type: (str, Optional[int]) -> None + Field.__init__(self, name, default, "H") + + def i2repr(self, _pkt, x): + # type: (Optional[Packet], I) -> str + return str(x) + + +class CborBoolField(CborItemBase, Field[bool, CborChunk]): + def __init__(self, name, default=None): + # type: (str, Optional[int]) -> None + Field.__init__(self, name, default, "H") + + def m2i(self, _pkt, x): + # type: (Optional[Packet], M) -> I + if x.head.major == CborMajorType.OTHERS: + if x.head.argument == CborSimpleValue.TRUE: + return True + elif x.head.argument == CborSimpleValue.FALSE: + return False + + raise TypeError + + def i2m(self, _pkt, x): + # type: (Optional[Packet], Optional[I]) -> M + return cbor_chunk_int(int(x)) + + +class CborUintField(CborItemBase, Field[int, CborChunk]): + ''' Allow non-negative integer values. ''' + + def __init__(self, name, default=None, maxval=None): + # type: (str, Optional[int]) -> None + Field.__init__(self, name, default, "H") + self.maxval = maxval + + def m2i(self, _pkt, x): + # type: (Optional[Packet], M) -> I + if x is None: + return None + + if x.head.major == CborMajorType.UINT: + return x.head.argument + + raise ValueError(f'Can only accept uint values, got {x.head.major}') + + def i2m(self, _pkt, x): + # type: (Optional[Packet], Optional[I]) -> M + if x is None: + return None + + if x >= 0: + major = CborMajorType.UINT + arg = x + else: + raise ValueError('Can only accept uint values') + + return CborChunk(head=CborHead(major=major, argument=arg)) + + def randval(self): + # type: () -> I + return RandNum(0, self.maxval) + + +class CborEnumField(CborUintField): + ''' An unsigned integer containing an enumerated value. + + :param enum: Available values for the field. + :type enum: :py:cls:`enum.IntEnum` + ''' + __slots__ = ( + 'enum', + ) + + def __init__(self, name, default, enum): + maxval = 0 + for val in enum: + maxval = max(maxval, int(val)) + self.enum = enum + + CborUintField.__init__(self, name, default, maxval) + + def m2i(self, pkt, val): + val = CborUintField.m2i(self, pkt, val) + if val is not None: + val = self.enum(val) + return val + + +class CborFlagsField(CborUintField): + ''' An unsigned integer containing enumerated flags. + + :param flags: Available flags for the field. + :type flags: :py:cls:`enum.IntFlag` + ''' + __slots__ = ( + 'flags', + ) + + def __init__(self, name, default, flags): + maxval = 0 + for val in flags: + maxval |= int(val) + self.flags = flags + + CborUintField.__init__(self, name, default, maxval) + + def m2i(self, pkt, x): + x = CborUintField.m2i(self, pkt, x) + if x is not None: + x = self.flags(x) + return x + + +class CborIntField(CborItemBase, Field[int, CborChunk]): + ''' Allow non-negative and negative integer values. ''' + + def __init__(self, name, default=None): + # type: (str, Optional[int]) -> None + Field.__init__(self, name, default, "H") + + def m2i(self, _pkt, x): + # type: (Optional[Packet], M) -> I + if x is None: + return None + + if x.head.major == CborMajorType.UINT: + return x.head.argument + elif x.head.major == CborMajorType.NINT: + return -1 - x.head.argument + + raise ValueError(f'Can only accept int values, got {x.head.major}') + + def i2m(self, _pkt, x): + # type: (Optional[Packet], Optional[I]) -> M + if x is None: + return None + + if x >= 0: + major = CborMajorType.UINT + arg = x + else: + major = CborMajorType.NINT + arg = -1 - x + + return CborChunk(head=CborHead(major=major, argument=arg)) + + def randval(self): + # type: () -> I + return RandNum(-2**64, 2**64 - 1) + + +class CborBstrField(CborItemBase, Field[bytes, CborChunk]): + ''' Allow byte string values. + + The human form of this field is as a hex-encoded text. + ''' + + decode_recurse = True + ''' By default include the actual string content ''' + + def __init__(self, name, default=None, maxlen=None): + # type: (str, Optional[bytes], Optional[int]) -> None + Field.__init__(self, name, default, "H") + self.maxlen = maxlen + + def i2repr(self, pkt, x): + # type: (Optional[Packet], I) -> str + if x is None: + return None + + return x.hex() + + def m2i(self, _pkt, x): + # type: (Optional[Packet], M) -> I + if x is None: + return None + + if x.head.major != CborMajorType.BSTR: + raise ValueError('Can only accept bstr values') + if self.maxlen is not None and len(x.content) > self.maxlen: + raise ValueError(f'Length of bstr {len(x.content)} ' + + f'longer than {self.maxlen}') + return bytes(x.content) + + def i2m(self, _pkt, x): + # type: (Optional[Packet], Optional[I]) -> M + if x is None: + return None + + return cbor_chunk_bstr(x) + + def i2h(self, _pkt, x): + if x is None: + return None + + return x.hex() + + def randval(self): + # type: () -> I + return RandBin(RandNum(0, self.maxlen or 256)) + + +class CborFieldArrayField(CborItemBase, Field[List[Any], List[CborChunk]]): + ''' A field which manages a list of sub-field values encoded in a + definite-length array. + ''' + + def __init__( + self, + name, # type: str + default, # type: Optional[List[AnyField]] + field, # type: AnyField + max_count=None, # type: Optional[int] + ): + # type: (...) -> None + if default is None: + default = [] # Create a new list for each instance + self.field = field + Field.__init__(self, name, default) + self.max_count = max_count + + def addfield(self, pkt, s, val): + # type: (Packet, bytes, Optional[I]) -> bytes + chunk = CborChunk(head=CborHead(CborMajorType.ARRAY, len(val))) + data = cbor_encode_chunk(chunk) + s += bytes(data) + + for ival in val: + s = self.field.addfield(pkt, s, ival) + + return s + + def getfield(self, pkt, s): + # type: (Packet, bytes) -> Tuple[bytes, I] + buf = bytearray(s) + _used, chunk = cbor_decode_chunk(buf, recurse=self.decode_recurse) + if chunk.head.major != CborMajorType.ARRAY: + raise ValueError(f'Field must be an array, got {chunk.head.major}') + + if self.max_count is not None and chunk.head.argument > self.max_count: + raise ValueError(f'Array size {chunk.head.argument} larger ' + + f'than maximum {self.max_count}') + + ivals = [] + for _ix in range(chunk.head.argument): + buf, ival = self.field.getfield(pkt, buf) + ivals.append(ival) + + return bytes(buf), ivals diff --git a/scapy/cborpacket.py b/scapy/cborpacket.py new file mode 100644 index 00000000000..a7da15557f2 --- /dev/null +++ b/scapy/cborpacket.py @@ -0,0 +1,147 @@ +# SPDX-License-Identifier: GPL-2.0-only +# This file is part of Scapy +# See https://scapy.net/ for more information +# Copyright (C) Brian Sipos + + +""" +Classes which support array-based CBOR packets. +""" + +from typing import Callable, List, Optional, Tuple, Type +from .packet import Packet +from .cbor import ( + CBOR_INDEF_BREAK, + CborMajorType, CborHead, cbor_encode_head, cbor_decode_chunk +) +from .cborfields import CborIntField + + +class CborSequencePacket(Packet): + ''' A sequence of items, one item for each field in the packet. + This packet does not include any head framing (e.g. an array). + ''' + + +class CborArrayPacket(CborSequencePacket): + ''' An array of items, one for each field in the packet. + Any additional bytes after the enclosing array are considered padding. + + The :py:inst:`cbor_use_indefinite` controls whether the encoded array is + indefinite length or not. + + The decoder will handle indefinite-length arrays according to the data and + store the original array argument (item count) in :py:inst:`array_head_arg`. + For both encoding and decoding, the member :py:inst:`array_seen_items` is + used to count the number of immediate items in the array. + ''' + + cbor_use_indefinite = False + ''' By default encode to definite length array. ''' + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.array_head_arg = None + self.array_seen_items = 0 + + def _inc_seen(self, pkt): + # type: (Optional[Packet]) -> None + ''' Increment the seen item counter on a parent CBOR packet. ''' + if hasattr(pkt, 'array_seen_items'): + pkt.array_seen_items += 1 + + def self_build(self): + # type: () -> bytes + if self.raw_packet_cache is not None: + return self.raw_packet_cache + + seqdata = super().self_build() + # notify parent of this array item + self._inc_seen(self.parent) + + # define prepended array framing + if self.cbor_use_indefinite: + head = bytes(cbor_encode_head(CborHead(CborMajorType.ARRAY, None))) + tail = bytes(cbor_encode_head(CborHead(CborMajorType.OTHERS, None))) + else: + head = bytes(cbor_encode_head(CborHead(CborMajorType.ARRAY, + self.array_seen_items))) + tail = b'' + + return head + seqdata + tail + + def do_build_payload(self): + # type: () -> bytes + return b'' + + def do_dissect(self, x): + # type: (bytes) -> bytes + + # inspect envelope + buf = bytearray(x) + _used, chunk = cbor_decode_chunk(buf, recurse=False) + if chunk.head.major != CborMajorType.ARRAY: + raise ValueError(f'Must have array head, got {chunk.head.major}') + + self.array_head_arg = chunk.head.argument + start_indefinite = chunk.head.argument is None + # notify parent of this array item + self._inc_seen(self.parent) + + res = super().do_dissect(bytes(buf)) + + if start_indefinite: + # match an indefinite break with an indefinite array head + try: + nextres = res[0] + res = res[1:] + except IndexError: + nextres = None + if nextres != 0xff: + raise ValueError(f'Need an indefinite break, have {nextres}') + + # ensure the cache is the full original data + self.raw_packet_cache = x + return res + + def extract_padding(self, s): + # type: (bytes) -> Tuple[bytes, Optional[bytes]] + return b'', s + + +def cbor_array_item_cb(pkt_cls: Type[Packet]) -> Callable: + ''' Build a callback to satisfy the :py:cls:`PacketListField` + parameter for `next_cls_cb`. + + :param pkt_cls: The class to decode when the array has not ended. + :return: A callback function. + ''' + + def next_item_cb(pkt: Packet, + _lst: List[Packet], + _cur: Optional[Packet], + remain: bytes, + ) -> Optional[Type[Packet]]: + ''' Determine if there is a next block to decode ''' + + if isinstance(pkt, CborArrayPacket): + if pkt.array_head_arg is not None: + # definite length counter + if pkt.array_seen_items < pkt.array_head_arg: + return pkt_cls + else: + # indefinite length until break + if remain and not remain.startswith(CBOR_INDEF_BREAK): + return pkt_cls + return None + + return next_item_cb + + +class CborTestPkt(CborArrayPacket): + ''' Dummy test packet ''' + fields_desc = [ + CborIntField('one', 5), + CborIntField('two', None), + ] diff --git a/scapy/contrib/bpv7.py b/scapy/contrib/bpv7.py new file mode 100644 index 00000000000..632f50d7bd9 --- /dev/null +++ b/scapy/contrib/bpv7.py @@ -0,0 +1,489 @@ + +import crcmod +import datetime +import enum +import logging +import struct +from typing import Any, Optional +from scapy import volatile +from scapy.config import conf +from scapy.fields import I, ConditionalField, PacketField, PacketListField +from scapy.packet import Packet, NoPayload, Raw, bind_layers +from scapy.cbor import ( + CborMajorType, cbor_chunk_int, cbor_chunk_tstr, cbor_chunk_array +) +from scapy.cborfields import ( + CborAnyField, CborUintField, CborIntField, CborEnumField, + CborFlagsField, CborBstrField, CborFieldArrayField +) +from scapy.cborpacket import ( + CborArrayPacket, CborSequencePacket, + cbor_array_item_cb +) + +LOG_RUNTIME = logging.getLogger("scapy.runtime") + + +class DtnTimeField(CborUintField): + ''' A DTN time value representing number of milliseconds from the + DTN epoch 2000-01-01T00:00:00Z. + + This value is automatically converted from a + :py:cls:`datetime.datetime` object and human friendly text in ISO8601 + format. + The special human value "zero" represents the zero value time. + ''' + + # Epoch reference for DTN Time + DTN_EPOCH = datetime.datetime(2000, 1, 1, 0, 0, 0, 0, datetime.timezone.utc) + + @staticmethod + def datetime_to_dtntime(val): + if val is None: + return 0 + delta = val - DtnTimeField.DTN_EPOCH + return int(delta / datetime.timedelta(milliseconds=1)) + + @staticmethod + def dtntime_to_datetime(val): + if val == 0 or val is None: + return None + delta = datetime.timedelta(milliseconds=val) + return delta + DtnTimeField.DTN_EPOCH + + def i2h(self, pkt, x): + dtval = DtnTimeField.dtntime_to_datetime(x) + if dtval is None: + return 'zero' + return dtval.isoformat(timespec='milliseconds') + + def i2repr(self, pkt, x): + return self.i2h(pkt, x) + + def h2i(self, pkt, x): + return self.any2i(pkt, x) + + def any2i(self, pkt, x): + if x is None: + return None + + elif isinstance(x, datetime.datetime): + return DtnTimeField.datetime_to_dtntime(x) + + elif isinstance(x, (str, bytes)): + return DtnTimeField.datetime_to_dtntime( + datetime.datetime.fromisoformat(x) + ) + + return int(x) + + def randval(self): + return volatile.RandNum(0, int(2 ** 16)) + + +class BundleTimestamp(CborArrayPacket): + ''' A structured representation of an DTN Timestamp. + The timestamp is a two-tuple of (time, sequence number) + The creation time portion is automatically converted from a + :py:cls:`datetime.datetime` object and text. + ''' + fields_desc = ( + DtnTimeField('dtntime', default=0), + CborUintField('seqno', default=0), + ) + + +class BundleEidPacket(CborArrayPacket): + ''' A structured representation of a BP Endpoint ID (EID) as a packet. + The EID is a two-item array of (scheme ID, scheme-specific part). + ''' + fields_desc = ( + CborUintField('scheme', default=None), + CborAnyField('ssp', default=None), + ) + + +class BundleEidField(CborAnyField): + ''' Provide a human-friendly representation of a BP Endpoint ID (EID) as + a single field. + The EID is a two-item array of (scheme ID, scheme-specific part). + ''' + + def i2h(self, _pkt, x): + # Translate to text form for known schemes + if x is None: + return None + if x.head.major != CborMajorType.ARRAY: + raise ValueError(f'EID must be enclosed in an array, got {x.head.major}') + + scheme_id = x.content[0].head.argument + ssp_items = x.content[1].content + match scheme_id: + case 1: + # DTN scheme + return 'dtn:' + ssp_items + case 2: + # IPN scheme, 2 or 3 element forms + parts = [chunk.head.argument for chunk in ssp_items] + return 'ipn:' + '.'.join(['{:d}'.format(part) for part in parts]) + case _: + raise ValueError(f'BP EID scheme {scheme_id} not understood') + + def h2i(self, _pkt, x): + # type: (Optional[Packet], Any) -> I + if x is None: + return None + + scheme, ssp = x.split(':', 1) + scheme = scheme.lower() + scheme_id = None + ssp_item = None + match scheme: + case 'dtn': + scheme_id = 1 + ssp_item = cbor_chunk_tstr(ssp) + case 'ipn': + # force handling as decimal + parts = [int(part, 10) for part in ssp.split('.')] + + scheme_id = 2 + ssp_item = cbor_chunk_array([ + cbor_chunk_int(part) + for part in parts + ]) + case _: + raise ValueError(f'BP EID scheme {scheme} not understood') + + return cbor_chunk_array([ + cbor_chunk_int(scheme_id), + ssp_item + ]) + + def i2repr(self, pkt, x): + return self.i2h(pkt, x) + + +class AbstractBlock(CborArrayPacket): + ''' Represent an abstract block with CRC fields. + + .. py:attribute:: crc_type_name + The name of the CRC-type field. + .. py:attribute:: crc_value_name + The name of the CRC-value field. + ''' + + @enum.unique + class CrcType(enum.IntEnum): + ''' CRC type values. + ''' + NONE = 0 + CRC16 = 1 + CRC32 = 2 + + # Map from CRC type to algorithm + CRC_DEFN = { + CrcType.CRC16: { # BPv7 CRC-16 X.25 + 'func': crcmod.predefined.mkPredefinedCrcFun('x-25'), + 'encode': lambda val: struct.pack('>H', val) + }, + CrcType.CRC32: { # BPv7 CRC-32 Castagnoli + 'func': crcmod.predefined.mkPredefinedCrcFun('crc-32c'), + 'encode': lambda val: struct.pack('>L', val) + }, + } + + crc_type_name = 'crc_type' + crc_value_name = 'crc_value' + + def fill_fields(self): + ''' Fill all fields so that the block is the full size it needs + to be for encoding encoding with build(). + Derived classes should populate their block-type-specific-data also. + ''' + crc_type = self.getfieldval(self.crc_type_name) + crc_value = self.fields.get(self.crc_value_name) + if crc_type and not crc_value: + defn = AbstractBlock.CRC_DEFN[crc_type] + # Encode with a zero-valued CRC field + self.fields[self.crc_value_name] = defn['encode'](0) + + def update_crc(self, keep_existing=False): + ''' Update this block's CRC field from the current field data + only if the current CRC (field not default) value is None. + ''' + if self.crc_type_name is None or self.crc_value_name is None: + return + + crc_type = self.getfieldval(self.crc_type_name) + if crc_type == 0: + crc_value = None + else: + crc_value = self.fields.get(self.crc_value_name) + if not keep_existing or crc_value is None: + defn = AbstractBlock.CRC_DEFN[crc_type] + # Encode with a zero-valued CRC field + self.fields[self.crc_value_name] = defn['encode'](0) + pre_crc = self.build() + crc_int = defn['func'](pre_crc) + crc_value = defn['encode'](crc_int) + + self.fields[self.crc_value_name] = crc_value + + def check_crc(self): + ''' Check the current CRC value, if enabled. + :return: True if the CRC is disabled or it is valid. + ''' + if self.crc_type_name is None or self.crc_value_name is None: + return True + + crc_type = self.getfieldval(self.crc_type_name) + crc_value = self.fields.get(self.crc_value_name) + if crc_type == 0: + valid = crc_value is None + else: + defn = AbstractBlock.CRC_DEFN[crc_type] + # Encode with a zero-valued CRC field + self.fields[self.crc_value_name] = defn['encode'](0) + pre_crc = self.build() + crc_int = defn['func'](pre_crc) + valid = crc_value == defn['encode'](crc_int) + # Restore old value + self.fields[self.crc_value_name] = crc_value + + return valid + + +class PrimaryBlock(AbstractBlock): + ''' The primary block definition ''' + + @enum.unique + class Flag(enum.IntFlag): + ''' Bundle processing control flags. + ''' + REQ_DELETION_REPORT = 0x040000 + ''' bundle deletion status reports are requested. ''' + REQ_DELIVERY_REPORT = 0x020000 + ''' bundle delivery status reports are requested. ''' + REQ_FORWARDING_REPORT = 0x010000 + ''' bundle forwarding status reports are requested. ''' + REQ_RECEPTION_REPORT = 0x004000 + ''' bundle reception status reports are requested. ''' + REQ_STATUS_TIME = 0x000040 + ''' status time is requested in all status reports. ''' + USER_APP_ACK = 0x000020 + ''' user application acknowledgement is requested. ''' + NO_FRAGMENT = 0x000004 + ''' bundle must not be fragmented. ''' + PAYLOAD_ADMIN = 0x000002 + ''' payload is an administrative record. ''' + IS_FRAGMENT = 0x000001 + ''' bundle is a fragment. ''' + + fields_desc = ( + CborUintField('bp_version', default=7), + CborFlagsField('bundle_flags', default=0, flags=Flag), + CborEnumField('crc_type', default=AbstractBlock.CrcType.NONE, + enum=AbstractBlock.CrcType), + BundleEidField('destination', default=None), + BundleEidField('source', default=None), + BundleEidField('report_to', default=None), + PacketField('create_ts', default=BundleTimestamp(), + pkt_cls=BundleTimestamp), + CborUintField('lifetime', default=0), + ConditionalField( + CborUintField('fragment_offset', default=0), + lambda block: (block.getfieldval('bundle_flags') + & PrimaryBlock.Flag.IS_FRAGMENT) + ), + ConditionalField( + CborUintField('total_app_data_len', default=0), + lambda block: (block.getfieldval('bundle_flags') + & PrimaryBlock.Flag.IS_FRAGMENT) + ), + ConditionalField( + CborBstrField('crc_value'), + lambda block: block.getfieldval('crc_type') != 0 + ), + ) + + +class CanonicalBlock(AbstractBlock): + ''' The canonical block definition with a type-specific payload. + + Any payload of this block is encoded as the "data" field when building + and decoded from the "data" field when dissecting. + ''' + + @enum.unique + class Flag(enum.IntFlag): + ''' Block processing control flags ''' + REMOVE_IF_NO_PROCESS = 0x10 + ''' block must be removed from bundle if it can't be processed. ''' + DELETE_IF_NO_PROCESS = 0x04 + ''' bundle must be deleted if block can't be processed. ''' + STATUS_IF_NO_PROCESS = 0x02 + ''' transmission of a status report is requested if block can't be + processed. ''' + REPLICATE_IN_FRAGMENT = 0x01 + ''' block must be replicated in every fragment. ''' + + fields_desc = ( + CborUintField('type_code', default=None), + CborUintField('block_num', default=None), + CborFlagsField('block_flags', default=0, flags=Flag), + CborEnumField('crc_type', default=AbstractBlock.CrcType.NONE, + enum=AbstractBlock.CrcType), + CborBstrField('btsd', default=None), # block-type-specific data here + ConditionalField( + CborBstrField('crc_value'), + lambda block: block.crc_type != 0 + ), + ) + + def ensure_block_type_specific_data(self): + ''' Embed payload as BTSD if not already present. + ''' + if isinstance(self.payload, NoPayload): + return + if self.fields.get('btsd') is not None: + # already present + return + + pay_data = self.payload.do_build() + self.fields['btsd'] = pay_data + + def fill_fields(self): + self.ensure_block_type_specific_data() + super().fill_fields() + + def self_build(self, *args, **kwargs): + self.ensure_block_type_specific_data() + return super().self_build(*args, **kwargs) + + def do_build_payload(self): + # Payload is handled by self_build() + return b'' + + def post_dissect(self, s): + # Extract payload from fields + blk_type = self.fields.get('type_code') + blk_data = self.fields.get('btsd') + + cls = None + if blk_data is not None and blk_type is not None: + try: + cls = self.guess_payload_class(None) + except KeyError: + pass + + if cls is not None and cls is not Raw: + try: + pay = cls(blk_data) + self.add_payload(pay) + except Exception as err: + if conf.debug_dissector: + raise + LOG_RUNTIME.warning('Failed to decode BPv7 BTSD: %s', err) + + return super().post_dissect(s) + + +class PreviousNodeBlock(CborSequencePacket): + ''' Block data content from Section 4.4.1 of RFC 9171. + ''' + fields_desc = ( + BundleEidField('node'), + ) + + +class BundleAgeBlock(CborSequencePacket): + ''' Block data content from Section 4.4.2 of RFC 9171. + ''' + fields_desc = ( + CborUintField('age'), + ) + + +class HopCountBlock(CborArrayPacket): + ''' Block data content from Section 4.4.3 of RFC 9171. + ''' + fields_desc = ( + CborUintField('limit'), + CborUintField('count'), + ) + + +bind_layers(CanonicalBlock, PreviousNodeBlock, type_code=6) +bind_layers(CanonicalBlock, BundleAgeBlock, type_code=7) +bind_layers(CanonicalBlock, HopCountBlock, type_code=10) + + +class BpsecKeyValPair(CborArrayPacket): + fields_desc = ( + CborUintField('key'), + CborAnyField('val'), + ) + + +class BpsecKeyValList(CborArrayPacket): + fields_desc = ( + PacketListField('pairs', [], pkt_cls=BpsecKeyValPair, + count_from=lambda pkt: pkt.array_head_arg), + ) + + +class BpsecKeyValListList(CborArrayPacket): + fields_desc = ( + PacketListField('items', [], pkt_cls=BpsecKeyValList, + count_from=lambda pkt: pkt.array_head_arg), + ) + + +class AbstractSecurityBock(CborSequencePacket): + ''' Block data content from Section 3.6 of RFC 9172. + ''' + + @enum.unique + class Flag(enum.IntFlag): + ''' ASB flags. + Defined in Section 3.6 of RFC 9172. + ''' + PARAMETERS = 0x01 + ''' Security context parameters present. ''' + + fields_desc = ( + CborFieldArrayField('targets', [], field=CborIntField('blk_num')), + CborIntField('context_id'), + CborFlagsField('flags', 0, flags=Flag), + BundleEidField('source', default=None), + ConditionalField( + PacketField('parameters', [], pkt_cls=BpsecKeyValList), + cond=lambda pkt: pkt.flags & AbstractSecurityBock.Flag.PARAMETERS + ), + # one packet in this list per target + PacketField('tgt_results', [], pkt_cls=BpsecKeyValListList), + ) + + +bind_layers(CanonicalBlock, AbstractSecurityBock, type_code=11) +bind_layers(CanonicalBlock, AbstractSecurityBock, type_code=12) + + +class BundleV7(CborArrayPacket): + ''' An entire decoded bundle contents. + + Bundles with administrative records are handled specially in that the + AdminRecord object will be made a (scapy) payload of the "payload block" + which is block type code 1. + ''' + + BLOCK_TYPE_PAYLOAD = 1 + BLOCK_NUM_PAYLOAD = 1 + + cbor_use_indefinite = True + ''' The bundle PDU used indefinite length. ''' + fields_desc = ( + PacketField('primary', default=None, pkt_cls=PrimaryBlock), + PacketListField('blocks', default=[], + next_cls_cb=cbor_array_item_cb(CanonicalBlock)), + ) diff --git a/test/contrib/bpv7.uts b/test/contrib/bpv7.uts new file mode 100644 index 00000000000..d85eac92867 --- /dev/null +++ b/test/contrib/bpv7.uts @@ -0,0 +1,100 @@ +% Bundle Protocol Version 7 tests for Scapy + ++ BPv7 EID + += decode human DTN + +from scapy.contrib.bpv7 import * +fld = BundleEidField('eid') +ival = fld.h2i(None, 'dtn://node/pa/th') +data = fld.addfield(None, b'', fld.i2m(None, ival)) +print(data.hex()) +assert data == bytes.fromhex('82016C2F2F6E6F64652F70612F7468') + += decode human IPN 2-element + +from scapy.contrib.bpv7 import * +fld = BundleEidField('eid') +ival = fld.h2i(None, 'ipn:4294967298.3') +data = fld.addfield(None, b'', fld.i2m(None, ival)) +print(data.hex()) +assert data == bytes.fromhex('8202821B000000010000000203') + += decode human IPN 3-element + +from scapy.contrib.bpv7 import * +fld = BundleEidField('eid') +ival = fld.h2i(None, 'ipn:1.2.3') +data = fld.addfield(None, b'', fld.i2m(None, ival)) +print(data.hex()) +assert data == bytes.fromhex('820283010203') + ++ BPv7 CODEC + += construction + +from scapy.contrib.bpv7 import * +pkt = BundleV7() + += decoding example from Appendix A.1.1.3 of RFC 9173 + +from scapy.contrib.bpv7 import * +data = bytes.fromhex('9f88070000820282010282028202018202820201820018281a000f424085010100005823526561647920746f2067656e657261746520612033322d62797465207061796c6f6164ff') +pkt = BundleV7(data) +print(repr(pkt)) +pkt.show() +assert pkt.primary.source == 'ipn:2.1' +assert pkt.primary.destination == 'ipn:1.2' + +pkt.clear_cache() +outdata = bytes(pkt) +print(outdata.hex()) +assert outdata == data + += decoding example from Appendix A.1.1.4 of RFC 9173 + +from scapy.contrib.bpv7 import * +data = bytes.fromhex('9f88070000820282010282028202018202820201820018281a000f424085070200004319012c85010100005823526561647920746f2067656e657261746520612033322d62797465207061796c6f6164ff') +pkt = BundleV7(data) +print(repr(pkt)) +pkt.show() +assert pkt.primary.source == 'ipn:2.1' +assert pkt.primary.destination == 'ipn:1.2' + +pkt.clear_cache() +outdata = bytes(pkt) +print(outdata.hex()) +assert outdata == data + += decoding example from Appendix A.1.4 of RFC 9173 + +from scapy.contrib.bpv7 import * +data = bytes.fromhex('9f88070000820282010282028202018202820201820018281a000f4240850b0200005856810101018202820201828201078203008181820158403bdc69b3a34a2b5d3a8554368bd1e808f606219d2a10a846eae3886ae4ecc83c4ee550fdfb1cc636b904e2f1a73e303dcd4b6ccece003e95e8164dcc89a156e185010100005823526561647920746f2067656e657261746520612033322d62797465207061796c6f6164ff') +pkt = BundleV7(data) +print(repr(pkt)) +pkt.show() +assert pkt.blocks[0].type_code == 11 +assert pkt.blocks[0].payload.targets == [1] +assert pkt.blocks[0].payload.context_id == 1 +assert pkt.blocks[0].payload.source == 'ipn:2.1' + +pkt.clear_cache() +outdata = bytes(pkt) +print(outdata.hex()) +assert outdata == data + + += decoding example from Appendix A of draft-dtn-bpsec-cose + +from scapy.contrib.bpv7 import * +data = bytes.fromhex('9f880700008201692f2f6473742f7376638201692f2f7372632f7376638201662f2f7372632f821b000000bd51281400001a000f42408501010000466568656c6c6fff') +pkt = BundleV7(data) +print(repr(pkt)) +pkt.show() +assert pkt.primary.source == 'dtn://src/svc' +assert pkt.primary.destination == 'dtn://dst/svc' + +pkt.clear_cache() +outdata = bytes(pkt) +print(outdata.hex()) +assert outdata == data diff --git a/test/scapy/layers/cbor.uts b/test/scapy/layers/cbor.uts new file mode 100644 index 00000000000..4592d67306b --- /dev/null +++ b/test/scapy/layers/cbor.uts @@ -0,0 +1,357 @@ +% CBOR regression tests for Scapy + + ++ CBOR CODEC uint + += uint encoding size0 + +chunk = CborChunk(head=CborHead(major=CborMajorType.UINT, argument=0x12)) +data = cbor_encode_chunk(chunk) +assert data == bytearray.fromhex('12') + += uint encoding size1 + +chunk = CborChunk(head=CborHead(major=CborMajorType.UINT, argument=0x34)) +data = cbor_encode_chunk(chunk) +assert data == bytearray.fromhex('1834') + += uint encoding size2 + +chunk = CborChunk(head=CborHead(major=CborMajorType.UINT, argument=0x1234)) +data = cbor_encode_chunk(chunk) +assert data == bytearray.fromhex('191234') + += uint encoding size4 + +chunk = CborChunk(head=CborHead(major=CborMajorType.UINT, argument=0x12345678)) +data = cbor_encode_chunk(chunk) +assert data == bytearray.fromhex('1a12345678') + += uint encoding size8 + +chunk = CborChunk(head=CborHead(major=CborMajorType.UINT, argument=0x1234567812345678)) +data = cbor_encode_chunk(chunk) +assert data == bytearray.fromhex('1b1234567812345678') + += uint decoding size0 + +data = bytearray.fromhex('12') +used,chunk = cbor_decode_chunk(data) +assert used == 1 +assert chunk.head.major == CborMajorType.UINT +assert chunk.head.argument == 18 + += uint decoding size1 + +data = bytearray.fromhex('1834') +used,chunk = cbor_decode_chunk(data) +assert used == 2 +assert chunk.head.major == CborMajorType.UINT +assert chunk.head.argument == 0x34 + += uint decoding size2 + +data = bytearray.fromhex('191234') +used,chunk = cbor_decode_chunk(data) +assert used == 3 +assert chunk.head.major == CborMajorType.UINT +assert chunk.head.argument == 0x1234 + += uint decoding size4 + +data = bytearray.fromhex('1a12345678') +used,chunk = cbor_decode_chunk(data) +assert used == 5 +assert chunk.head.major == CborMajorType.UINT +assert chunk.head.argument == 0x12345678 + += uint decoding size8 + +data = bytearray.fromhex('1b1234567812345678') +used,chunk = cbor_decode_chunk(data) +assert used == 9 +assert chunk.head.major == CborMajorType.UINT +assert chunk.head.argument == 0x1234567812345678 + ++ CBOR CODEC nint + += nint encoding size0 + +chunk = cbor_chunk_int(-0x13) +data = cbor_encode_chunk(chunk) +assert data == bytearray.fromhex('32') + += nint decoding size0 + +data = bytearray.fromhex('32') +used,chunk = cbor_decode_chunk(data) +assert used == 1 +assert chunk.head.major == CborMajorType.NINT +assert chunk.head.argument == 0x12 + += nint decoding size2 + +data = bytearray.fromhex('391234') +used,chunk = cbor_decode_chunk(data) +assert used == 3 +assert chunk.head.major == CborMajorType.NINT +assert chunk.head.argument == 0x1234 + ++ CBOR CODEC bstr + += bstr encode size0 + +chunk = cbor_chunk_bstr(b'hi') +data = cbor_encode_chunk(chunk) +assert data == bytearray.fromhex('426869') + += bstr decode size0 +hexdata = '426869' + +data = bytearray.fromhex(hexdata) +used,chunk = cbor_decode_chunk(data, False) +assert used == 1 +assert chunk.head.major == CborMajorType.BSTR +assert chunk.head.argument == 2 +assert chunk.content is None + +data = bytearray.fromhex(hexdata) +used,chunk = cbor_decode_chunk(data, True) +assert used == 3 +assert chunk.head.major == CborMajorType.BSTR +assert chunk.head.argument == 2 +assert chunk.content == b"hi" + += bstr decode size1 +hexdata = '58186c6f6e676c6f6e676c6f6e676c6f6e676c6f6e676c6f6e67' + +data = bytearray.fromhex(hexdata) +used,chunk = cbor_decode_chunk(data, False) +assert used == 2 +assert chunk.head.major == CborMajorType.BSTR +assert chunk.head.argument == 24 +assert chunk.content is None + +data = bytearray.fromhex(hexdata) +used,chunk = cbor_decode_chunk(data, True) +assert used == 26 +assert chunk.head.major == CborMajorType.BSTR +assert chunk.head.argument == 24 +assert chunk.content == b"longlonglonglonglonglong" + += bstr encode size-indefinite content +chunk = cbor_chunk_indef(CborMajorType.BSTR, [cbor_chunk_bstr(b'hi'), cbor_chunk_bstr(b'oh')]) +data = cbor_encode_chunk(chunk) +print(data.hex()) +assert data == bytearray.fromhex('5F426869426F68FF') + += bstr encode size-indefinite parts +chunks = [ + cbor_chunk_indef(CborMajorType.BSTR), + cbor_chunk_bstr(b'hi'), + cbor_chunk_bstr(b'oh'), + cbor_chunk_break(), +] +data = b''.join(cbor_encode_chunk(chunk) for chunk in chunks) +print(data.hex()) +assert data == bytearray.fromhex('5F426869426F68FF') + += bstr decode size-indefinite +hexdata = '5F426869426F68FF' + +data = bytearray.fromhex(hexdata) +used,chunk = cbor_decode_chunk(data, False) +assert used == 1 +assert chunk.head.major == CborMajorType.BSTR +assert chunk.head.argument is None +assert chunk.content is None + +data = bytearray.fromhex(hexdata) +used,chunk = cbor_decode_chunk(data, True) +assert used == 8 +assert chunk.head.major == CborMajorType.BSTR +assert chunk.head.argument is None +assert chunk.content == [cbor_chunk_bstr(b'hi'), cbor_chunk_bstr(b'oh'), cbor_chunk_break()] + ++ CBOR CODEC tstr + += tstr encode size0 + +chunk = cbor_chunk_tstr('hi') +data = cbor_encode_chunk(chunk) +assert data == bytearray.fromhex('626869') + += tstr decode size0 +hexdata = '626869' + +data = bytearray.fromhex(hexdata) +used,chunk = cbor_decode_chunk(data, False) +assert used == 1 +assert chunk.head.major == CborMajorType.TSTR +assert chunk.head.argument == 2 +assert chunk.content is None + +data = bytearray.fromhex(hexdata) +used,chunk = cbor_decode_chunk(data, True) +assert used == 3 +assert chunk.head.major == CborMajorType.TSTR +assert chunk.head.argument == 2 +assert chunk.content == "hi" + += tstr decode size1 +hexdata = '78186c6f6e676c6f6e676c6f6e676c6f6e676c6f6e676c6f6e67' + +data = bytearray.fromhex(hexdata) +used,chunk = cbor_decode_chunk(data, False) +assert used == 2 +assert chunk.head.major == CborMajorType.TSTR +assert chunk.head.argument == 24 +assert chunk.content is None + +data = bytearray.fromhex(hexdata) +used,chunk = cbor_decode_chunk(data, True) +assert used == 26 +assert chunk.head.major == CborMajorType.TSTR +assert chunk.head.argument == 24 +assert chunk.content == "longlonglonglonglonglong" + + ++ CBOR CODEC array + += array encode size0 + +chunk = cbor_chunk_array([cbor_chunk_int(10), cbor_chunk_int(-20)]) +data = cbor_encode_chunk(chunk) +assert data == bytearray.fromhex('820A33') + += array decode size0 +hexdata = '820A33' + +data = bytearray.fromhex(hexdata) +used,chunk = cbor_decode_chunk(data, False) +assert used == 1 +assert chunk.head.major == CborMajorType.ARRAY +assert chunk.head.argument == 2 +assert chunk.content is None + +data = bytearray.fromhex(hexdata) +used,chunk = cbor_decode_chunk(data, True) +assert used == 3 +assert chunk.head.major == CborMajorType.ARRAY +assert chunk.head.argument == 2 +assert chunk.content == [cbor_chunk_int(10), cbor_chunk_int(-20)] + + ++ CBOR CODEC map + += map encode size0 + +chunk = cbor_chunk_map({cbor_chunk_int(10): cbor_chunk_int(-20)}) +data = cbor_encode_chunk(chunk) +assert data == bytearray.fromhex('A10A33') + += map decode size0 +hexdata = 'A10A33' + +data = bytearray.fromhex(hexdata) +used,chunk = cbor_decode_chunk(data, False) +assert used == 1 +assert chunk.head.major == CborMajorType.MAP +assert chunk.head.argument == 1 +assert chunk.content is None + +data = bytearray.fromhex(hexdata) +used,chunk = cbor_decode_chunk(data, True) +assert used == 3 +assert chunk.head.major == CborMajorType.MAP +assert chunk.head.argument == 1 +print(chunk.content) +assert chunk.content == {cbor_chunk_int(10): cbor_chunk_int(-20)} + + ++ CBOR CODEC special + += special decoding false + +data = bytearray.fromhex('f4') +used,chunk = cbor_decode_chunk(data) +assert used == 1 +assert chunk.tags == tuple() +assert chunk.head.major == CborMajorType.OTHERS +assert chunk.content == CborSimpleValue.FALSE + += special decoding true + +data = bytearray.fromhex('f5') +used,chunk = cbor_decode_chunk(data) +assert used == 1 +assert chunk.tags == tuple() +assert chunk.head.major == CborMajorType.OTHERS +assert chunk.content == CborSimpleValue.TRUE + += special encoding null + +chunk = cbor_chunk_simple(CborSimpleValue.NULL) +data = cbor_encode_chunk(chunk) +assert data == bytearray.fromhex('f6') + += special decoding null + +data = bytearray.fromhex('f6') +used,chunk = cbor_decode_chunk(data) +assert used == 1 +assert chunk.tags == tuple() +assert chunk.head.major == CborMajorType.OTHERS +assert chunk.content == CborSimpleValue.NULL + += special encoding undefined + +chunk = cbor_chunk_simple(CborSimpleValue.UNDEFINED) +data = cbor_encode_chunk(chunk) +assert data == bytearray.fromhex('f7') + += special decoding undefined + +data = bytearray.fromhex('f6') +used,chunk = cbor_decode_chunk(data) +assert used == 1 +assert chunk.tags == tuple() +assert chunk.head.major == CborMajorType.OTHERS +assert chunk.content == CborSimpleValue.NULL + += special encoding float64 + +chunk = cbor_chunk_float(1.5e20) +assert chunk.content == 1.5e20 +data = cbor_encode_chunk(chunk) +print(data.hex()) +assert data == bytearray.fromhex('FB442043561A882930') + += special decoding float64 + +data = bytearray.fromhex('FB442043561A882930') +used,chunk = cbor_decode_chunk(data) +assert used == 9 +assert chunk.tags == tuple() +assert chunk.head.major == CborMajorType.OTHERS +assert chunk.head.argument == bytes.fromhex('442043561A882930') +assert chunk.content == 1.5e20 + + ++ CBOR fields and packets + += array packets + +pkt = CborTestPkt(bytearray.fromhex('820102')) +print(pkt.show()) +data = bytes(pkt) +assert len(data) == 3 +pkt2 = CborTestPkt(data) +assert pkt2 == pkt + += default None + +pkt = CborTestPkt() +print(pkt.show()) +data = bytes(pkt) +assert len(data) == 2