Skip to content

Commit

Permalink
Merge pull request #297 from andlaus/improve_overlap_detection
Browse files Browse the repository at this point in the history
improve detection of overlapping parameters while encoding
  • Loading branch information
andlaus committed Apr 26, 2024
2 parents 3e5ce68 + 3f5012c commit 723be6a
Show file tree
Hide file tree
Showing 16 changed files with 193 additions and 72 deletions.
Binary file modified examples/somersault.pdx
Binary file not shown.
21 changes: 17 additions & 4 deletions examples/somersaultecu.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from odxtools.docrevision import DocRevision
from odxtools.environmentdata import EnvironmentData
from odxtools.environmentdatadescription import EnvironmentDataDescription
from odxtools.exceptions import odxrequire
from odxtools.functionalclass import FunctionalClass
from odxtools.modification import Modification
from odxtools.multiplexer import Multiplexer
Expand Down Expand Up @@ -669,21 +670,22 @@ class SomersaultSID(IntEnum):
long_name=None,
semantic=None,
description=None,
diag_coded_type=somersault_diagcodedtypes["uint16"],
diag_coded_type=somersault_diagcodedtypes["uint8"],
byte_position=0,
coded_value=uds.positive_response_id(
SID.TesterPresent.value), # type: ignore[attr-defined]
bit_position=None,
sdgs=[],
),
CodedConstParameter(
ValueParameter(
short_name="status",
long_name=None,
semantic=None,
description=None,
diag_coded_type=somersault_diagcodedtypes["uint8"],
dop_ref=OdxLinkRef("somersault.DOP.uint8", doc_frags),
dop_snref=None,
physical_default_value_raw="0",
byte_position=1,
coded_value=0x00,
bit_position=None,
sdgs=[],
),
Expand Down Expand Up @@ -941,6 +943,17 @@ class SomersaultSID(IntEnum):
),
}

# this is a hack to get around a catch-22: we need to specify the
# value of a positive response to the tester present parameter to
# specify ISO_15765_3.CP_TesterPresentMessage communication parameter,
# but we need the comparam for the raw diaglayer which we need for
# retrieving the DOP of the "status" parameter in order to convert the
# raw physical default value.
param = somersault_positive_responses["tester_ok"].parameters.status
assert isinstance(param, ValueParameter)
param._dop = somersault_dops["uint8"]
param._physical_default_value = int(odxrequire(param.physical_default_value_raw))

# negative responses
somersault_negative_responses = {
"general":
Expand Down
4 changes: 3 additions & 1 deletion odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,9 @@ def encode_into_pdu(self, physical_value: Optional[ParameterValue],
# position directly after the structure and let
# EncodeState add the padding as needed.
encode_state.cursor_byte_position = encode_state.origin_byte_position + self.byte_size
encode_state.emplace_bytes(b'', "<PADDING>")
# Padding bytes needed. these count as "used".
encode_state.coded_message += b"\x00" * (self.byte_size - actual_len)
encode_state.used_mask += b"\xff" * (self.byte_size - actual_len)

# encode the length- and table keys. This cannot be done above
# because we allow these to be defined implicitly (i.e. they
Expand Down
4 changes: 2 additions & 2 deletions odxtools/dataobjectproperty.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ def encode_into_pdu(self, physical_value: ParameterValue, encode_state: EncodeSt
f"The value {repr(physical_value)} of type {type(physical_value).__name__}"
f" is not a valid.")

internal_val = self.convert_physical_to_internal(physical_value)
self.diag_coded_type.encode_into_pdu(internal_val, encode_state)
internal_value = self.convert_physical_to_internal(physical_value)
self.diag_coded_type.encode_into_pdu(internal_value, encode_state)

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
"""
Expand Down
127 changes: 90 additions & 37 deletions odxtools/encodestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ class EncodeState:
"""

#: payload that has been constructed so far
coded_message: bytearray
coded_message: bytearray = field(default_factory=bytearray)

#: the bits of the payload that are used
used_mask: bytearray = field(default_factory=bytearray)

#: The absolute position in bytes from the beginning of the PDU to
#: which relative positions refer to, e.g., the beginning of the
Expand Down Expand Up @@ -53,45 +56,60 @@ class EncodeState:
#: (needed for MinMaxLengthType, EndOfPduField, etc.)
is_end_of_pdu: bool = True

def __post_init__(self) -> None:
# if a coded message has been specified, but no used_mask, we
# assume that all of the bits of the coded message are
# currently used.
if len(self.coded_message) > len(self.used_mask):
self.used_mask += b'\xff' * (len(self.coded_message) - len(self.used_mask))
if len(self.coded_message) < len(self.used_mask):
odxraise(f"The specified bit mask 0x{self.used_mask.hex()} for used bits "
f"is not suitable for representing the coded_message "
f"0x{self.coded_message.hex()}")
self.used_mask = self.used_mask[:len(self.coded_message)]

def emplace_atomic_value(
self,
*,
internal_value: AtomicOdxType,
bit_length: int,
base_data_type: DataType,
is_highlow_byte_order: bool,
used_mask: Optional[bytes],
) -> None:
"""Convert the internal_value to bytes and emplace this into the PDU"""

raw_value: AtomicOdxType

# Check that bytes and strings actually fit into the bit length
if base_data_type == DataType.A_BYTEFIELD:
if isinstance(internal_value, bytearray):
internal_value = bytes(internal_value)
if not isinstance(internal_value, bytes):
odxraise()
if 8 * len(internal_value) > bit_length:
raise EncodeError(f"The bytefield {internal_value.hex()} is too large "
f"({len(internal_value)} bytes)."
f" The maximum length is {bit_length//8}.")
if base_data_type == DataType.A_ASCIISTRING:
raw_value = internal_value
elif base_data_type == DataType.A_ASCIISTRING:
if not isinstance(internal_value, str):
odxraise()

# The spec says ASCII, meaning only byte values 0-127.
# But in practice, vendors use iso-8859-1, aka latin-1
# reason being iso-8859-1 never fails since it has a valid
# character mapping for every possible byte sequence.
internal_value = internal_value.encode("iso-8859-1")
raw_value = internal_value.encode("iso-8859-1")

if 8 * len(internal_value) > bit_length:
if 8 * len(raw_value) > bit_length:
raise EncodeError(f"The string {repr(internal_value)} is too large."
f" The maximum number of characters is {bit_length//8}.")
elif base_data_type == DataType.A_UTF8STRING:
if not isinstance(internal_value, str):
odxraise()

internal_value = internal_value.encode("utf-8")
raw_value = internal_value.encode("utf-8")

if 8 * len(internal_value) > bit_length:
if 8 * len(raw_value) > bit_length:
raise EncodeError(f"The string {repr(internal_value)} is too large."
f" The maximum number of bytes is {bit_length//8}.")

Expand All @@ -100,11 +118,13 @@ def emplace_atomic_value(
odxraise()

text_encoding = "utf-16-be" if is_highlow_byte_order else "utf-16-le"
internal_value = internal_value.encode(text_encoding)
raw_value = internal_value.encode(text_encoding)

if 8 * len(internal_value) > bit_length:
if 8 * len(raw_value) > bit_length:
raise EncodeError(f"The string {repr(internal_value)} is too large."
f" The maximum number of characters is {bit_length//16}.")
else:
raw_value = internal_value

# If the bit length is zero, return empty bytes
if bit_length == 0:
Expand All @@ -125,46 +145,79 @@ def emplace_atomic_value(
left_pad = f"p{padding}" if padding > 0 else ""

# actually encode the value
coded = bitstruct.pack(f"{left_pad}{char}{bit_length}", internal_value)

# apply byte order for numeric objects
coded = bitstruct.pack(f"{left_pad}{char}{bit_length}", raw_value)

# create the raw mask of used bits for numeric objects
used_mask_raw = used_mask
if base_data_type in [DataType.A_INT32, DataType.A_UINT32
] and (self.cursor_bit_position != 0 or
(self.cursor_bit_position + bit_length) % 8 != 0):
if used_mask is None:
tmp = (1 << bit_length) - 1
else:
tmp = int.from_bytes(used_mask, "big")
tmp <<= self.cursor_bit_position

used_mask_raw = tmp.to_bytes((self.cursor_bit_position + bit_length + 7) // 8, "big")

# apply byte order to numeric objects
if not is_highlow_byte_order and base_data_type in [
DataType.A_INT32,
DataType.A_UINT32,
DataType.A_FLOAT32,
DataType.A_FLOAT64,
DataType.A_INT32, DataType.A_UINT32, DataType.A_FLOAT32, DataType.A_FLOAT64
]:
coded = coded[::-1]

self.emplace_bytes(coded)
if used_mask_raw is not None:
used_mask_raw = used_mask_raw[::-1]

self.cursor_bit_position = 0
self.emplace_bytes(coded, obj_used_mask=used_mask_raw)

def emplace_bytes(self,
new_data: bytes,
obj_name: Optional[str] = None,
obj_used_mask: Optional[bytes] = None) -> None:
if self.cursor_bit_position != 0:
odxraise("EncodeState.emplace_bytes can only be called "
"for a bit position of 0!", RuntimeError)

def emplace_bytes(self, new_data: bytes, obj_name: Optional[str] = None) -> None:
pos = self.cursor_byte_position

# Make blob longer if necessary
min_length = pos + len(new_data)
if len(self.coded_message) < min_length:
self.coded_message.extend([0] * (min_length - len(self.coded_message)))

for i in range(len(new_data)):
# insert new byte. this is pretty hacky: it will fail if
# the value to be inserted is bitwise "disjoint" from the
# value which is already in the PDU...
if self.coded_message[pos + i] & new_data[i] != 0:
if obj_name is not None:
warnings.warn(
f"'{obj_name}' overlaps with another object (bits to be set are already set)",
OdxWarning,
stacklevel=1,
)
else:
pad = b'\x00' * (min_length - len(self.coded_message))
self.coded_message += pad
self.used_mask += pad

if obj_used_mask is None:
# Happy path for when no obj_used_mask has been
# specified. In this case we assume that all bits of the
# new data to be emplaced are used.
n = len(new_data)

if self.used_mask[pos:pos + n] != b'\x00' * n:
warnings.warn(
f"Overlapping objects detected in between bytes {pos} and "
f"{pos+n}",
OdxWarning,
stacklevel=1,
)
self.coded_message[pos:pos + n] = new_data
self.used_mask[pos:pos + n] = b'\xff' * n
else:
# insert data the hard way, i.e. we have to look at each
# individual byte to determine if it has already been used
# somewhere else (it would be nice if bytearrays supported
# bitwise operations!)
for i in range(len(new_data)):
if self.used_mask[pos + i] & obj_used_mask[i] != 0:
warnings.warn(
"Object overlap (bits to be set are already set)",
f"Overlapping objects detected at position {pos + i}",
OdxWarning,
stacklevel=1,
)

self.coded_message[pos + i] |= new_data[i]
self.coded_message[pos + i] &= ~obj_used_mask[i]
self.coded_message[pos + i] |= new_data[i] & obj_used_mask[i]
self.used_mask[pos + i] |= obj_used_mask[i]

self.cursor_byte_position += len(new_data)
self.cursor_bit_position = 0
8 changes: 8 additions & 0 deletions odxtools/leadinglengthinfotype.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,23 @@ def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeSta

byte_length = self._minimal_byte_length_of(internal_value)

used_mask = None
bit_pos = encode_state.cursor_bit_position
if encode_state.cursor_bit_position != 0 or (bit_pos + self.bit_length) % 8 != 0:
used_mask = (1 << self.bit_length) - 1
used_mask <<= bit_pos

encode_state.emplace_atomic_value(
internal_value=byte_length,
used_mask=None,
bit_length=self.bit_length,
base_data_type=DataType.A_UINT32,
is_highlow_byte_order=self.is_highlow_byte_order,
)

encode_state.emplace_atomic_value(
internal_value=internal_value,
used_mask=None,
bit_length=8 * byte_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
Expand Down
1 change: 1 addition & 0 deletions odxtools/minmaxlengthtype.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeSta
orig_cursor = encode_state.cursor_byte_position
encode_state.emplace_atomic_value(
internal_value=internal_value,
used_mask=None,
bit_length=8 * data_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
Expand Down
2 changes: 1 addition & 1 deletion odxtools/parameterinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def parameter_info(param_list: Iterable[Parameter], quoted_names: bool = False)
of.write(f"{q}{param.short_name}{q}: <matches request>\n")
continue
elif isinstance(param, NrcConstParameter):
of.write(f"{q}{param.short_name}{q}: NRC_const; choices = {param.coded_values}\n")
of.write(f"{q}{param.short_name}{q}: const; choices = {param.coded_values}\n")
continue
elif isinstance(param, ReservedParameter):
of.write(f"{q}{param.short_name}{q}: <reserved>\n")
Expand Down
5 changes: 4 additions & 1 deletion odxtools/parameters/lengthkeyparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ def encode_placeholder_into_pdu(self, physical_value: Optional[ParameterValue],
encode_state.cursor_byte_position = pos
encode_state.cursor_bit_position = self.bit_position or 0

self.dop.encode_into_pdu(encode_state=encode_state, physical_value=0)
# emplace a value of zero into the encode state, but pretend the bits not to be used
n = odxrequire(self.dop.get_static_bit_length()) + encode_state.cursor_bit_position
tmp_val = b'\x00' * ((n + 7) // 8)
encode_state.emplace_bytes(tmp_val, obj_used_mask=tmp_val)

encode_state.cursor_byte_position = max(encode_state.cursor_byte_position, orig_cursor)
encode_state.cursor_bit_position = 0
Expand Down
10 changes: 6 additions & 4 deletions odxtools/parameters/tablekeyparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,16 @@ def encode_placeholder_into_pdu(self, physical_value: Optional[ParameterValue],
odxraise(f"No KEY-DOP specified for table {self.table.short_name}")
return

size = key_dop.get_static_bit_length()

if size is None:
sz = key_dop.get_static_bit_length()
if sz is None:
odxraise("The DOP of table key {self.short_name} must exhibit a fixed size.",
EncodeError)
return

encode_state.emplace_bytes(bytes([0] * (size // 8)), self.short_name)
# emplace a value of zero into the encode state, but pretend the bits not to be used
n = sz + encode_state.cursor_bit_position
tmp_val = b'\x00' * ((n + 7) // 8)
encode_state.emplace_bytes(tmp_val, obj_used_mask=tmp_val)

encode_state.cursor_byte_position = max(orig_pos, encode_state.cursor_byte_position)
encode_state.cursor_bit_position = 0
Expand Down
1 change: 1 addition & 0 deletions odxtools/paramlengthinfotype.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeSta

encode_state.emplace_atomic_value(
internal_value=internal_value,
used_mask=None,
bit_length=bit_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
Expand Down
3 changes: 1 addition & 2 deletions odxtools/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) ->
return Request(**kwargs)

def encode(self, **kwargs: ParameterValue) -> bytes:
encode_state = EncodeState(
coded_message=bytearray(), triggering_request=None, is_end_of_pdu=True)
encode_state = EncodeState(is_end_of_pdu=True)

self.encode_into_pdu(physical_value=kwargs, encode_state=encode_state)

Expand Down
3 changes: 1 addition & 2 deletions odxtools/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) ->
return Response(response_type=response_type, **kwargs)

def encode(self, coded_request: Optional[bytes] = None, **kwargs: ParameterValue) -> bytes:
encode_state = EncodeState(
coded_message=bytearray(), triggering_request=coded_request, is_end_of_pdu=True)
encode_state = EncodeState(triggering_request=coded_request, is_end_of_pdu=True)

self.encode_into_pdu(physical_value=kwargs, encode_state=encode_state)

Expand Down
Loading

0 comments on commit 723be6a

Please sign in to comment.