Skip to content

Commit

Permalink
Merge pull request #292 from andlaus/refactor_encoding2
Browse files Browse the repository at this point in the history
Refactor encoding, part 2 (diag coded types)
  • Loading branch information
andlaus committed Apr 22, 2024
2 parents 29331b3 + 440f8f2 commit 0f12e21
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 131 deletions.
18 changes: 16 additions & 2 deletions odxtools/dataobjectproperty.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,22 @@ def convert_physical_to_bytes(self,
f" is not a valid.")

internal_val = self.convert_physical_to_internal(physical_value)
return self.diag_coded_type.convert_internal_to_bytes(
internal_val, encode_state, bit_position=bit_position)

tmp_state = EncodeState(
bytearray(),
encode_state.parameter_values,
triggering_request=encode_state.triggering_request,
is_end_of_pdu=encode_state.is_end_of_pdu,
cursor_byte_position=0,
cursor_bit_position=bit_position,
origin_byte_position=0)

self.diag_coded_type.encode_into_pdu(internal_val, tmp_state)

encode_state.length_keys.update(tmp_state.length_keys)
encode_state.table_keys.update(tmp_state.table_keys)

return tmp_state.coded_message

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
"""
Expand Down
20 changes: 11 additions & 9 deletions odxtools/diagcodedtype.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@ def is_highlow_byte_order(self) -> bool:

@staticmethod
def _encode_internal_value(
*,
internal_value: AtomicOdxType,
bit_position: int,
bit_length: int,
base_data_type: DataType,
is_highlow_byte_order: bool,
) -> bytes:
"""Convert the internal_value to bytes."""
"""Convert the internal_value to bytes and emplace this into the PDU"""
# Check that bytes and strings actually fit into the bit length
if base_data_type == DataType.A_BYTEFIELD:
if isinstance(internal_value, bytearray):
Expand Down Expand Up @@ -114,8 +115,9 @@ def _encode_internal_value(
if (base_data_type.value in [
DataType.A_INT32, DataType.A_UINT32, DataType.A_FLOAT32, DataType.A_FLOAT64
] and base_data_type.value != 0):
raise EncodeError(
f"The number {repr(internal_value)} cannot be encoded into {bit_length} bits.")
odxraise(
f"The number {repr(internal_value)} cannot be encoded into {bit_length} bits.",
EncodeError)
return b''

char = ODX_TYPE_TO_FORMAT_LETTER[base_data_type]
Expand Down Expand Up @@ -160,11 +162,10 @@ def _minimal_byte_length_of(self, internal_value: Union[bytes, str]) -> int:
odxassert(
byte_length % 2 == 0, f"The bit length of A_UNICODE2STRING must"
f" be a multiple of 16 but is {8*byte_length}")

return byte_length

@abc.abstractmethod
def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state: EncodeState,
bit_position: int) -> bytes:
def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeState) -> None:
"""Encode the internal value.
Parameters
Expand All @@ -177,9 +178,9 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state:
mapping from ID (of the length key) to bit length
(only needed for ParamLengthInfoType)
"""
pass
raise NotImplementedError(
f".encode_into_pdu() is not implemented by the class {type(self).__name__}")

@abc.abstractmethod
def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
"""Decode the parameter value from the coded message.
Expand All @@ -195,4 +196,5 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
int
the next byte position after the extracted parameter
"""
pass
raise NotImplementedError(
f".decode_from_pdu() is not implemented by the class {type(self).__name__}")
18 changes: 15 additions & 3 deletions odxtools/dtcdop.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,21 @@ def convert_physical_to_bytes(self,
f" DiagnosticTroubleCode but got {physical_value!r}.")

internal_trouble_code = self.compu_method.convert_physical_to_internal(trouble_code)

return self.diag_coded_type.convert_internal_to_bytes(
internal_trouble_code, encode_state=encode_state, bit_position=bit_position)
tmp_state = EncodeState(
bytearray(),
encode_state.parameter_values,
triggering_request=encode_state.triggering_request,
is_end_of_pdu=False,
cursor_byte_position=0,
cursor_bit_position=0,
origin_byte_position=0)
encode_state.cursor_bit_position = encode_state.cursor_bit_position
self.diag_coded_type.encode_into_pdu(internal_trouble_code, encode_state=tmp_state)

encode_state.length_keys.update(tmp_state.length_keys)
encode_state.table_keys.update(tmp_state.table_keys)

return tmp_state.coded_message

def _build_odxlinks(self) -> Dict[OdxLinkId, Any]:
odxlinks = super()._build_odxlinks()
Expand Down
25 changes: 17 additions & 8 deletions odxtools/leadinglengthinfotype.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import Any, Optional
from typing import Optional

from typing_extensions import override

from .decodestate import DecodeState
from .diagcodedtype import DctType, DiagCodedType
from .encodestate import EncodeState
from .exceptions import odxassert, odxraise
from .exceptions import EncodeError, odxassert, odxraise
from .odxtypes import AtomicOdxType, DataType


Expand Down Expand Up @@ -42,29 +44,36 @@ def get_static_bit_length(self) -> Optional[int]:
# DCT is dynamic!
return None

def convert_internal_to_bytes(self, internal_value: Any, encode_state: EncodeState,
bit_position: int) -> bytes:
@override
def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeState) -> None:

if not isinstance(internal_value, (str, bytes)):
odxraise(
f"LEADING-LENGTH-INFO types can only be used for strings and byte fields, "
f"not {type(internal_value).__name__}", EncodeError)
return

byte_length = self._minimal_byte_length_of(internal_value)

length_bytes = self._encode_internal_value(
byte_length,
bit_position=bit_position,
internal_value=byte_length,
bit_position=encode_state.cursor_bit_position,
bit_length=self.bit_length,
base_data_type=DataType.A_UINT32,
is_highlow_byte_order=self.is_highlow_byte_order,
)

value_bytes = self._encode_internal_value(
internal_value,
internal_value=internal_value,
bit_position=0,
bit_length=8 * byte_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
)

return length_bytes + value_bytes
encode_state.emplace_atomic_value(length_bytes + value_bytes, "<LEADING-LENGTH-INFO-TYPE>")

@override
def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:

# Extract length of the parameter value
Expand Down
32 changes: 21 additions & 11 deletions odxtools/minmaxlengthtype.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from dataclasses import dataclass
from typing import Optional

from typing_extensions import override

from .decodestate import DecodeState
from .diagcodedtype import DctType, DiagCodedType
from .encodestate import EncodeState
Expand Down Expand Up @@ -51,8 +53,9 @@ def __termination_sequence(self) -> bytes:
termination_sequence = bytes([0xFF, 0xFF])
return termination_sequence

def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state: EncodeState,
bit_position: int) -> bytes:
@override
def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeState) -> None:

if not isinstance(internal_value, (bytes, str)):
odxraise("MinMaxLengthType is currently only implemented for strings and byte arrays",
EncodeError)
Expand All @@ -64,7 +67,7 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state:

value_bytes = bytearray(
self._encode_internal_value(
internal_value,
internal_value=internal_value,
bit_position=0,
bit_length=8 * data_length,
base_data_type=self.base_data_type,
Expand All @@ -74,7 +77,10 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state:
# TODO: ensure that the termination delimiter is not
# encountered within the encoded value.

odxassert(self.termination != "END-OF-PDU" or encode_state.is_end_of_pdu)
odxassert(
self.termination != "END-OF-PDU" or encode_state.is_end_of_pdu,
"Encountered a MIN-MAX-LENGTH type with END-OF-PDU termination "
"which is not at the end of the PDU")
if encode_state.is_end_of_pdu or len(value_bytes) == self.max_length:
# All termination types may be ended by the end of the PDU
# or once reaching the maximum length. In this case, we
Expand All @@ -90,15 +96,19 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state:
value_bytes.extend(termination_sequence)

if len(value_bytes) < self.min_length:
raise EncodeError(f"Encoded value for MinMaxLengthType "
f"must be at least {self.min_length} bytes long. "
f"(Is: {len(value_bytes)} bytes.)")
odxraise(
f"Encoded value for MinMaxLengthType "
f"must be at least {self.min_length} bytes long. "
f"(Is: {len(value_bytes)} bytes.)", EncodeError)
return
elif self.max_length is not None and len(value_bytes) > self.max_length:
raise EncodeError(f"Encoded value for MinMaxLengthType "
f"must not be longer than {self.max_length} bytes. "
f"(Is: {len(value_bytes)} bytes.)")
odxraise(
f"Encoded value for MinMaxLengthType "
f"must not be longer than {self.max_length} bytes. "
f"(Is: {len(value_bytes)} bytes.)", EncodeError)
return

return value_bytes
encode_state.emplace_atomic_value(value_bytes, "<MIN-MAX-LENGTH-TYPE>")

def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
odxassert(decode_state.cursor_bit_position == 0,
Expand Down
16 changes: 13 additions & 3 deletions odxtools/parameters/codedconstparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,19 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
encode_state.parameter_values[self.short_name] != self.coded_value):
raise TypeError(f"The parameter '{self.short_name}' is constant {self._coded_value_str}"
" and thus can not be changed.")
bit_position_int = self.bit_position if self.bit_position is not None else 0
return self.diag_coded_type.convert_internal_to_bytes(
self.coded_value, encode_state=encode_state, bit_position=bit_position_int)

tmp_state = EncodeState(
bytearray(),
encode_state.parameter_values,
triggering_request=encode_state.triggering_request,
is_end_of_pdu=False,
cursor_byte_position=0,
cursor_bit_position=0,
origin_byte_position=0)
encode_state.cursor_bit_position = self.bit_position or 0
self.diag_coded_type.encode_into_pdu(self.coded_value, encode_state=tmp_state)

return tmp_state.coded_message

@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
Expand Down
4 changes: 3 additions & 1 deletion odxtools/parameters/lengthkeyparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ def is_settable(self) -> bool:

@override
def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
physical_value = encode_state.parameter_values.get(self.short_name, 0)
physical_value = encode_state.length_keys.get(self.short_name)
if physical_value is None:
physical_value = encode_state.parameter_values.get(self.short_name, 0)

bit_pos = self.bit_position or 0
dop = odxrequire(super().dop,
Expand Down
15 changes: 12 additions & 3 deletions odxtools/parameters/nrcconstparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,18 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
# I think it does not matter ...
coded_value = self.coded_values[0]

bit_position_int = self.bit_position if self.bit_position is not None else 0
return self.diag_coded_type.convert_internal_to_bytes(
coded_value, encode_state, bit_position=bit_position_int)
tmp_state = EncodeState(
bytearray(),
encode_state.parameter_values,
triggering_request=encode_state.triggering_request,
is_end_of_pdu=False,
cursor_byte_position=0,
cursor_bit_position=0,
origin_byte_position=0)
encode_state.cursor_bit_position = self.bit_position or 0
self.diag_coded_type.encode_into_pdu(coded_value, encode_state=tmp_state)

return tmp_state.coded_message

@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
Expand Down
54 changes: 32 additions & 22 deletions odxtools/paramlengthinfotype.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, cast

from typing_extensions import override

from .decodestate import DecodeState
from .diagcodedtype import DctType, DiagCodedType
from .encodestate import EncodeState
from .exceptions import odxraise
from .exceptions import EncodeError, odxraise
from .odxlink import OdxLinkDatabase, OdxLinkId, OdxLinkRef
from .odxtypes import AtomicOdxType, DataType

Expand Down Expand Up @@ -43,46 +45,56 @@ def _resolve_snrefs(self, diag_layer: "DiagLayer") -> None:
def length_key(self) -> "LengthKeyParameter":
return self._length_key

def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state: EncodeState,
bit_position: int) -> bytes:
bit_length = encode_state.parameter_values.get(self.length_key.short_name, None)
@override
def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeState) -> None:
bit_length = encode_state.length_keys.get(self.length_key.short_name)

if bit_length is None:
# the length key is implicit, i.e., we need to set the
# value for the length key in the encode_state based on
# the value passed here.
if self.base_data_type in [
DataType.A_BYTEFIELD,
DataType.A_ASCIISTRING,
DataType.A_UTF8STRING,
]:
bit_length = 8 * len(internal_value) # type: ignore[arg-type]
if self.base_data_type in [DataType.A_UNICODE2STRING]:
bit_length = 16 * len(internal_value) # type: ignore[arg-type]

if self.base_data_type in [DataType.A_INT32, DataType.A_UINT32]:
bit_length = 8 * len(cast(str, internal_value))
elif self.base_data_type in [DataType.A_UNICODE2STRING]:
bit_length = 16 * len(cast(str, internal_value))
elif self.base_data_type in [DataType.A_INT32, DataType.A_UINT32]:
bit_length = int(internal_value).bit_length()
if self.base_data_type == DataType.A_INT32:
bit_length += 1
# Round up
bit_length = ((bit_length + 7) // 8) * 8

encode_state.parameter_values[self.length_key.short_name] = bit_length

if bit_length is None:
odxraise()

return self._encode_internal_value(
internal_value,
bit_position=bit_position,
elif self.base_data_type == DataType.A_FLOAT32:
bit_length = 32
elif self.base_data_type == DataType.A_FLOAT64:
bit_length = 64
else:
odxraise(
f"Cannot determine size of an object of type "
f"{self.base_data_type.value}", EncodeError)
return

encode_state.length_keys[self.length_key.short_name] = bit_length

raw_data = self._encode_internal_value(
internal_value=internal_value,
bit_position=encode_state.cursor_bit_position,
bit_length=bit_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
)

encode_state.emplace_atomic_value(raw_data, "<PARAM-LENGTH-INFO-TYPE>")

def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
# First, we need to find a length key with matching ID.
if self.length_key.short_name not in decode_state.length_keys:
odxraise(f"Unspecified mandatory length key parameter "
f"{self.length_key.short_name}")
decode_state.cursor_bit_position = None
decode_state.cursor_bit_position = 0
return cast(None, AtomicOdxType)

bit_length = decode_state.length_keys[self.length_key.short_name]
Expand All @@ -91,10 +103,8 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
bit_length = 0

# Extract the internal value and return.
value = decode_state.extract_atomic_value(
return decode_state.extract_atomic_value(
bit_length,
self.base_data_type,
self.is_highlow_byte_order,
)

return value
Loading

0 comments on commit 0f12e21

Please sign in to comment.