From 2bdce77fa88da0d430925c87297947a5d02f11b7 Mon Sep 17 00:00:00 2001 From: Ayoub Kaanich Date: Sun, 13 Nov 2022 13:30:38 +0100 Subject: [PATCH 01/10] Support for COMPARAM-SUBSET --- odxtools/communicationparameter.py | 70 ++++++---- odxtools/comparam_subset.py | 202 +++++++++++++++++++++++++++++ odxtools/database.py | 56 ++++---- odxtools/diaglayer.py | 43 +++--- odxtools/units.py | 12 +- 5 files changed, 312 insertions(+), 71 deletions(-) create mode 100644 odxtools/comparam_subset.py diff --git a/odxtools/communicationparameter.py b/odxtools/communicationparameter.py index b5b2fe35..5fd7ba0f 100644 --- a/odxtools/communicationparameter.py +++ b/odxtools/communicationparameter.py @@ -1,10 +1,34 @@ # SPDX-License-Identifier: MIT # Copyright (c) 2022 MBition GmbH -from typing import Optional, List +from typing import Optional, List, TYPE_CHECKING +import warnings -from .odxlink import OdxLinkRef, OdxDocFragment +from odxtools.exceptions import OdxWarning + +from .odxlink import OdxLinkDatabase, OdxLinkRef, OdxDocFragment from .utils import read_description_from_odx +from .comparam_subset import BaseComparam, Comparam, ComplexComparam, read_complex_value_from_odx + + +def _get_comparam_value(comparam: BaseComparam, value): + if not comparam: + return value + if value is None: + value = comparam.physical_default_value + + if isinstance(comparam, Comparam): + return comparam.dop.physical_type.base_data_type.from_string(value) + + if isinstance(comparam, ComplexComparam) and isinstance(value, list): + value = value + [None] * (len(comparam.comparams) - len(value)) + result = dict() + for i, cp in enumerate(comparam.comparams): + result[cp.short_name] = _get_comparam_value(cp, value[i]) + return result + + return value + class CommunicationParameterRef: @@ -14,41 +38,39 @@ def __init__(self, description: Optional[str] = None, protocol_sn_ref: Optional[str] = None, prot_stack_sn_ref: Optional[str] = None): - self.value = value + self._value = value self.id_ref = id_ref self.description = description self.protocol_sn_ref = protocol_sn_ref self.prot_stack_sn_ref = prot_stack_sn_ref + self.comparam: Optional[BaseComparam] = None def __repr__(self) -> str: val = self.value - if not isinstance(self.value, list): + if isinstance(val, str): val = f"'{val}'" - - return f"CommunicationParameter('{self.id_ref}', value={val})" + return f"CommunicationParameter('{self.short_name}', value={val})" def __str__(self) -> str: - val = self.value - if not isinstance(self.value, list): - val = f"'{val}'" + return self.__repr__() - return f"CommunicationParameter('{self.id_ref}', value={val})" + def _resolve_references(self, odxlinks: OdxLinkDatabase): + # Temporary lenient until tests are updated + self.comparam = odxlinks.resolve_lenient(self.id_ref) + if not self.comparam: + warnings.warn(f"Could not resolve COMPARAM '{self.id_ref}'", OdxWarning) - def _python_name(self) -> str: - # ODXLINK IDs allow dots and hyphens, but python identifiers - # do not. since _python_name() must return the latter, we have - # to replace these characters... + @property + def short_name(self): + if self.comparam: + return self.comparam.short_name + # ODXLINK IDs allow dots and hyphens, but python identifiers do not. + # This should not happen anyway in a correct PDX return self.id_ref.ref_id.replace(".", "__").replace("-", "_") - -def _read_complex_value_from_odx(et_element): - result = [] - for el in et_element.findall("*"): - if el.tag == "SIMPLE-VALUE": - result.append('' if el.text is None else el.text) - else: - result.append(_read_complex_value_from_odx(el)) - return result + @property + def value(self): + return _get_comparam_value(self.comparam, self._value) def read_communication_param_ref_from_odx(et_element, doc_frags: List[OdxDocFragment]): @@ -62,7 +84,7 @@ def read_communication_param_ref_from_odx(et_element, doc_frags: List[OdxDocFrag elif et_element.find("SIMPLE-VALUE") is not None: value = et_element.findtext("SIMPLE-VALUE") else: - value = _read_complex_value_from_odx(et_element.find("COMPLEX-VALUE")) + value = read_complex_value_from_odx(et_element.find("COMPLEX-VALUE")) description = read_description_from_odx(et_element.find("DESC")) protocol_sn_ref = et_element.findtext("PROTOCOL-SNREF") diff --git a/odxtools/comparam_subset.py b/odxtools/comparam_subset.py new file mode 100644 index 00000000..83877bce --- /dev/null +++ b/odxtools/comparam_subset.py @@ -0,0 +1,202 @@ + + +from dataclasses import dataclass, field +from typing import Any, Dict, List, Literal, Optional + +from odxtools.dataobjectproperty import DataObjectProperty, read_data_object_property_from_odx +from odxtools.nameditemlist import NamedItemList +from odxtools.odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId, OdxLinkRef +from odxtools.units import UnitSpec, read_unit_spec_from_odx +from odxtools.utils import read_description_from_odx, short_name_as_id + + +StandardizationLevel = Literal[ + "STANDARD", + "OEM-SPECIFIC", + "OPTIONAL", + "OEM-OPTIONAL", +] + +Usage = Literal[ + "ECU-SOFTWARE", + "ECU-COMM", + "APPLICATION", + "TESTER", +] + +def read_complex_value_from_odx(et_element): + result = [] + for el in et_element.findall("*"): + if el.tag == "SIMPLE-VALUE": + result.append('' if el.text is None else el.text) + else: + result.append(read_complex_value_from_odx(el)) + return result + +@dataclass +class BaseComparam: + odx_id: OdxLinkId + short_name: str + long_name: Optional[str] = field(default=None, init=False) + description: Optional[str] = field(default=None, init=False) + physical_default_value: Any = field(default=None, init=False) + param_class: str + cptype: StandardizationLevel + cpusage: Usage + display_level: Optional[int] = field(default=None, init=False) + + def _resolve_references(self, odxlinks: OdxLinkDatabase): + pass + + def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: + return {self.odx_id: self} + + +@dataclass() +class ComplexComparam(BaseComparam): + comparams: NamedItemList[BaseComparam] + allow_multiple_values: bool = False + + def _resolve_references(self, odxlinks: OdxLinkDatabase): + for comparam in self.comparams: + comparam._resolve_references(odxlinks) + + def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: + odxlinks = super()._build_odxlinks() + for comparam in self.comparams: + odxlinks.update(comparam._build_odxlinks()) + return odxlinks + + +@dataclass() +class Comparam(BaseComparam): + dop_ref: OdxLinkRef + _dop: Optional[DataObjectProperty] = field(default=None, init=False) + + @property + def dop(self) -> DataObjectProperty: + """The data object property describing this parameter.""" + assert self._dop is not None + return self._dop + + def _resolve_references(self, odxlinks: OdxLinkDatabase): + """Resolves the reference to the dop""" + self._dop = odxlinks.resolve(self.dop_ref) + + +@dataclass() +class ComparamSubset: + odx_id: OdxLinkId + short_name: str + data_object_props: NamedItemList[DataObjectProperty] + comparams: NamedItemList[BaseComparam] + unit_spec: Optional[UnitSpec] = None + long_name: Optional[str] = None + description: Optional[str] = None + + def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: + odxlinks = {self.odx_id: self} + + for dop in self.data_object_props: + odxlinks[dop.odx_id] = dop + + for comparam in self.comparams: + odxlinks.update(comparam._build_odxlinks()) + + if self.unit_spec: + odxlinks.update(self.unit_spec._build_odxlinks()) + + return odxlinks + + def _resolve_references(self, odxlinks: OdxLinkDatabase): + for dop in self.data_object_props: + dop._resolve_references(odxlinks) + + for comparam in self.comparams: + comparam._resolve_references(odxlinks) + + if self.unit_spec: + self.unit_spec._resolve_references(odxlinks) + +def read_comparam_from_odx(et_element, doc_frags: List[OdxDocFragment]) -> BaseComparam: + odx_id = OdxLinkId.from_et(et_element, doc_frags) + short_name = et_element.findtext("SHORT-NAME") + param_class = et_element.attrib.get("PARAM-CLASS") + cptype = et_element.attrib.get("CPTYPE") + cpusage = et_element.attrib.get("CPUSAGE") + + comparam: BaseComparam + if et_element.tag == "COMPARAM": + dop_ref = OdxLinkRef.from_et(et_element.find("DATA-OBJECT-PROP-REF"), doc_frags) + comparam = Comparam( + odx_id=odx_id, + short_name=short_name, + param_class=param_class, + cptype=cptype, + cpusage=cpusage, + dop_ref=dop_ref, + ) + comparam.physical_default_value = et_element.findtext("PHYSICAL-DEFAULT-VALUE") + elif et_element.tag == "COMPLEX-COMPARAM": + comparams = [ + read_comparam_from_odx(el, doc_frags) + for el in et_element if el.tag in ('COMPARAM', 'COMPLEX-COMPARAM') + ] + comparam = ComplexComparam( + odx_id=odx_id, + short_name=short_name, + param_class=param_class, + cptype=cptype, + cpusage=cpusage, + comparams=NamedItemList(short_name_as_id, comparams), + ) + complex_values = et_element.iterfind("COMPLEX-PHYSICAL-DEFAULT-VALUE/COMPLEX-VALUES/COMPLEX-VALUE") + comparam.physical_default_value = list(map(read_complex_value_from_odx, complex_values)) + comparam.allow_multiple_values = et_element.get("ALLOW-MULTIPLE-VALUES", "false") == 'true' + else: + assert False, f"Failed to parse COMPARAM {short_name}" + + comparam.long_name = et_element.findtext("LONG-NAME") + comparam.description = read_description_from_odx(et_element.find("DESC")) + display_level = et_element.attrib.get("DISPLAY-LEVEL", None) + if display_level is not None: + comparam.display_level = int(display_level) + + return comparam + + +def read_comparam_subset_from_odx(et_element) -> ComparamSubset: + + short_name = et_element.findtext("SHORT-NAME") + doc_frags = [OdxDocFragment(short_name, "COMPARAM-SUBSET")] + odx_id = OdxLinkId.from_et(et_element, doc_frags) + long_name = et_element.findtext("LONG-NAME") + description = read_description_from_odx(et_element.find("DESC")) + + data_object_props = [ + read_data_object_property_from_odx(el, doc_frags) + for el in et_element.iterfind("DATA-OBJECT-PROPS/DATA-OBJECT-PROP") + ] + comparams: List[BaseComparam] = [] + comparams += [ + read_comparam_from_odx(el, doc_frags) + for el in et_element.iterfind("COMPARAMS/COMPARAM") + ] + comparams += [ + read_comparam_from_odx(el, doc_frags) + for el in et_element.iterfind("COMPLEX-COMPARAMS/COMPLEX-COMPARAM") + ] + if et_element.find("UNIT-SPEC") is not None: + unit_spec = read_unit_spec_from_odx(et_element.find("UNIT-SPEC"), doc_frags) + else: + unit_spec = None + + return ComparamSubset( + odx_id=odx_id, + short_name=short_name, + long_name=long_name, + description=description, + data_object_props=NamedItemList(short_name_as_id, data_object_props), + comparams=NamedItemList(short_name_as_id, comparams), + unit_spec=unit_spec, + ) diff --git a/odxtools/database.py b/odxtools/database.py index dcf0c07b..5170144a 100644 --- a/odxtools/database.py +++ b/odxtools/database.py @@ -1,9 +1,8 @@ # SPDX-License-Identifier: MIT # Copyright (c) 2022 MBition GmbH -from functools import lru_cache from pathlib import Path -from typing import Any, Optional, Dict, Union +from typing import List, Optional from xml.etree import ElementTree from itertools import chain from zipfile import ZipFile @@ -11,9 +10,11 @@ from .utils import short_name_as_id from .odxlink import OdxLinkDatabase from .diaglayer import DiagLayer, DiagLayerContainer, read_diag_layer_container_from_odx +from .comparam_subset import ComparamSubset, read_comparam_subset_from_odx from .globals import logger from .nameditemlist import NamedItemList + class Database: """This class internalizes the diagnostic database for various ECUs described by a collection of ODX files which are usually collated @@ -25,8 +26,6 @@ def __init__(self, odx_d_file_name: Optional[str] = None, enable_candela_workarounds: bool = True): - dlc_elements = [] - if pdx_zip is None and odx_d_file_name is None: # create an empty database object return @@ -34,6 +33,7 @@ def __init__(self, if pdx_zip is not None and odx_d_file_name is not None: raise TypeError("The 'pdx_zip' and 'odx_d_file_name' parameters are mutually exclusive") + documents = [] if pdx_zip is not None: names = list(pdx_zip.namelist()) names.sort() @@ -44,26 +44,29 @@ def __init__(self, logger.info(f"Processing the file {zip_member}") d = pdx_zip.read(zip_member) root = ElementTree.fromstring(d) - dlc = root.find("DIAG-LAYER-CONTAINER") - if dlc is not None: - dlc_elements.append(dlc) - - tmp = [ - read_diag_layer_container_from_odx(dlc_el, - enable_candela_workarounds=enable_candela_workarounds) \ - for dlc_el in dlc_elements - ] - self._diag_layer_containers = NamedItemList(short_name_as_id, tmp) - self._diag_layer_containers.sort(key=short_name_as_id) - self.finalize_init() + documents.append(root) elif odx_d_file_name is not None: - dlc_element = ElementTree.parse(odx_d_file_name).find("DIAG-LAYER-CONTAINER") - - self._diag_layer_containers = \ - NamedItemList(short_name_as_id, - [read_diag_layer_container_from_odx(dlc_element)]) - self.finalize_init() + documents.append(ElementTree.parse(odx_d_file_name)) + + dlcs: List[DiagLayerContainer] = [] + comparam_subsets: List[ComparamSubset] = [] + for root in documents: + dlc = root.find("DIAG-LAYER-CONTAINER") + if dlc is not None: + dlcs.append(read_diag_layer_container_from_odx( + dlc, + enable_candela_workarounds=enable_candela_workarounds + )) + subset = root.find("COMPARAM-SUBSET") + if subset is not None: + comparam_subsets.append(read_comparam_subset_from_odx(subset)) + + self._diag_layer_containers = NamedItemList(short_name_as_id, dlcs) + self._diag_layer_containers.sort(key=short_name_as_id) + self._comparam_subsets = NamedItemList(short_name_as_id, comparam_subsets) + self._comparam_subsets.sort(key=short_name_as_id) + self.finalize_init() def finalize_init(self) -> None: # Create wrapper objects @@ -77,6 +80,9 @@ def finalize_init(self) -> None: # Build odxlinks self._odxlinks = OdxLinkDatabase() + for subset in self.comparam_subsets: + self.odxlinks.update(subset._build_odxlinks()) + for dlc in self.diag_layer_containers: self.odxlinks.update(dlc._build_odxlinks()) @@ -84,6 +90,8 @@ def finalize_init(self) -> None: self.odxlinks.update(dl._build_odxlinks()) # Resolve references + for subset in self.comparam_subsets: + subset._resolve_references(self.odxlinks) for dlc in self.diag_layer_containers: dlc._resolve_references(self.odxlinks) @@ -114,3 +122,7 @@ def diag_layer_containers(self): @diag_layer_containers.setter def diag_layer_containers(self, value): self._diag_layer_containers = value + + @property + def comparam_subsets(self): + return self._comparam_subsets \ No newline at end of file diff --git a/odxtools/diaglayer.py b/odxtools/diaglayer.py index b12ebf71..16598e32 100644 --- a/odxtools/diaglayer.py +++ b/odxtools/diaglayer.py @@ -12,7 +12,7 @@ from .state import read_state_from_odx from .state_transition import read_state_transition_from_odx -from .odxlink import OdxLinkRef, OdxLinkId,OdxLinkDatabase, OdxDocFragment +from .odxlink import OdxLinkRef, OdxLinkId, OdxLinkDatabase, OdxDocFragment from .utils import read_description_from_odx from .nameditemlist import NamedItemList from .admindata import AdminData, read_admin_data_from_odx @@ -42,7 +42,7 @@ class DiagLayer: class ParentRef: def __init__(self, - parent : Union[OdxLinkRef, "DiagLayer"], + parent: Union[OdxLinkRef, "DiagLayer"], ref_type: str, not_inherited_diag_comms=[], not_inherited_dops=[]): @@ -144,8 +144,7 @@ def __init__(self, self.local_diag_data_dictionary_spec = diag_data_dictionary_spec # Communication parameters, e.g. CAN-IDs - self._local_communication_parameters = NamedItemList[CommunicationParameterRef](lambda cp: cp._python_name(), - communication_parameters) + self._local_communication_parameters = communication_parameters self.additional_audiences = additional_audiences self.functional_classes = functional_classes @@ -156,7 +155,7 @@ def __init__(self, self._services: NamedItemList[Union[DiagService, SingleEcuJob]]\ = NamedItemList(short_name_as_id, []) self._communication_parameters: NamedItemList[CommunicationParameterRef]\ - = NamedItemList(lambda cp: cp._python_name(), []) + = NamedItemList(short_name_as_id, []) self._data_object_properties: NamedItemList[DopBase]\ = NamedItemList(short_name_as_id, []) @@ -243,12 +242,13 @@ def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: self._data_object_properties = NamedItemList[DopBase]( short_name_as_id, dops) + for comparam in self._local_communication_parameters: + comparam._resolve_references(odxlinks) - comparams = sorted(self._compute_available_commmunication_parameters_by_name().values(), - key=lambda comparam: comparam.id_ref.ref_id) self._communication_parameters = NamedItemList[CommunicationParameterRef]( - lambda cp: cp._python_name(), - comparams) + short_name_as_id, + list(self._compute_available_commmunication_parameters_by_name().values()) + ) # Resolve all other references for struct in chain(self.requests, @@ -353,7 +353,7 @@ def _build_coded_prefix_tree(self): (a) SIDs for different services are the same like for service 1 and 2 (thus each leaf node is a list) and (b) one SID is the prefix of another SID like for service 3 and 4 (thus the constant `-1` key). """ - services = [ s for s in self._services if isinstance(s, DiagService) ] + services = [s for s in self._services if isinstance(s, DiagService)] prefix_tree = {} for s in services: # Compute prefixes for the request and all responses @@ -418,7 +418,9 @@ def decode(self, message: Union[bytes, bytearray]) -> Iterable[Message]: f"None of the services {possible_services} could parse {message.hex()}.") return decoded_messages - def decode_response(self, response: Union[bytes, bytearray], request: Union[bytes, bytearray, Message]) -> Iterable[Message]: + def decode_response( + self, response: Union[bytes, bytearray], + request: Union[bytes, bytearray, Message]) -> Iterable[Message]: if isinstance(request, Message): possible_services = [request.service] else: @@ -444,9 +446,9 @@ def decode_response(self, response: Union[bytes, bytearray], request: Union[byte return decoded_messages def get_communication_parameter(self, cp_id: str) \ - -> Optional[CommunicationParameterRef]: + -> Optional[CommunicationParameterRef]: - cps = [ cp for cp in self.communication_parameters if cp.id_ref.ref_id == cp_id ] + cps = [cp for cp in self.communication_parameters if cp.id_ref.ref_id == cp_id] if len(cps) > 1: warnings.warn(f"Communication parameter `{cp_id}` specified more " f"than once. Using first occurence.", OdxWarning) @@ -553,12 +555,13 @@ def __str__(self) -> str: def read_parent_ref_from_odx(et_element, doc_frags: List[OdxDocFragment]) \ - -> DiagLayer.ParentRef: + -> DiagLayer.ParentRef: parent_ref = OdxLinkRef.from_et(et_element, doc_frags) assert parent_ref is not None not_inherited_diag_comms = [el.get("SHORT-NAME") - for el in et_element.iterfind("NOT-INHERITED-DIAG-COMMS/NOT-INHERITED-DIAG-COMM/DIAG-COMM-SNREF")] + for el in et_element.iterfind( + "NOT-INHERITED-DIAG-COMMS/NOT-INHERITED-DIAG-COMM/DIAG-COMM-SNREF")] not_inherited_dops = [el.get("SHORT-NAME") for el in et_element.iterfind("NOT-INHERITED-DOPS/NOT-INHERITED-DOP/DOP-BASE-SNREF")] ref_type = et_element.get(f"{xsi}type") @@ -574,7 +577,7 @@ def read_parent_ref_from_odx(et_element, doc_frags: List[OdxDocFragment]) \ def read_diag_layer_from_odx(et_element, doc_frags: List[OdxDocFragment], enable_candela_workarounds=True) \ - -> DiagLayer: + -> DiagLayer: variant_type = et_element.tag @@ -712,9 +715,8 @@ def __init__(self, self.base_variants = base_variants self.ecu_variants = ecu_variants - self._diag_layers = NamedItemList[DiagLayer]( - short_name_as_id, - list(chain(self.ecu_shared_datas, self.protocols, self.functional_groups, self.base_variants, self.ecu_variants))) + self._diag_layers = NamedItemList[DiagLayer](short_name_as_id, list( + chain(self.ecu_shared_datas, self.protocols, self.functional_groups, self.base_variants, self.ecu_variants))) def _build_odxlinks(self): result = {} @@ -737,7 +739,6 @@ def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: for cd in self.company_datas: cd._resolve_references(odxlinks) - @property def diag_layers(self): return self._diag_layers @@ -759,7 +760,7 @@ def read_diag_layer_container_from_odx(et_element, enable_candela_workarounds=Tr # create the current ODX "document fragment" (description of the # current document for references and IDs) - doc_frags = [ OdxDocFragment(short_name, "CONTAINER") ] + doc_frags = [OdxDocFragment(short_name, "CONTAINER")] odx_id = OdxLinkId.from_et(et_element, doc_frags) assert odx_id is not None diff --git a/odxtools/units.py b/odxtools/units.py index 306a84d4..30ab54d3 100644 --- a/odxtools/units.py +++ b/odxtools/units.py @@ -3,11 +3,13 @@ from dataclasses import dataclass, field from typing import Dict, List, Any, Literal, Optional, Union +import warnings from .utils import short_name_as_id from .nameditemlist import NamedItemList from .utils import read_description_from_odx from .odxlink import OdxLinkRef, OdxLinkId, OdxLinkDatabase, OdxDocFragment +from .exceptions import OdxWarning UnitGroupCategory = Literal["COUNTRY", "EQUIV-UNITS"] @@ -113,10 +115,12 @@ def __post_init__(self): self._physical_dimension = None if self.factor_si_to_unit is not None or self.offset_si_to_unit is not None or self.physical_dimension_ref is not None: - assert self.factor_si_to_unit is not None and self.offset_si_to_unit is not None and self.physical_dimension_ref is not None, ( - f"Error 54: If one of factor_si_to_unit, offset_si_to_unit and physical_dimension_ref is defined," - f" all of them must be defined: {self.factor_si_to_unit} and {self.offset_si_to_unit} and {self.physical_dimension_ref}" - ) + if self.factor_si_to_unit is not None and self.offset_si_to_unit is not None and self.physical_dimension_ref is not None: + warnings.warn( + f"Error 54: If one of factor_si_to_unit, offset_si_to_unit and physical_dimension_ref is defined," + f" all of them must be defined: {self.factor_si_to_unit} and {self.offset_si_to_unit} and {self.physical_dimension_ref}", + OdxWarning + ) @property def physical_dimension(self) -> PhysicalDimension: From 895e746d1df0893a260dee63813c2910eb36ed16 Mon Sep 17 00:00:00 2001 From: Ayoub Kaanich Date: Sun, 13 Nov 2022 13:47:46 +0100 Subject: [PATCH 02/10] Fix tests --- odxtools/comparam_subset.py | 11 +++++++++-- odxtools/database.py | 4 +++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/odxtools/comparam_subset.py b/odxtools/comparam_subset.py index 83877bce..20a1ede6 100644 --- a/odxtools/comparam_subset.py +++ b/odxtools/comparam_subset.py @@ -24,6 +24,7 @@ "TESTER", ] + def read_complex_value_from_odx(et_element): result = [] for el in et_element.findall("*"): @@ -33,6 +34,7 @@ def read_complex_value_from_odx(et_element): result.append(read_complex_value_from_odx(el)) return result + @dataclass class BaseComparam: odx_id: OdxLinkId @@ -86,7 +88,7 @@ def _resolve_references(self, odxlinks: OdxLinkDatabase): @dataclass() class ComparamSubset: - odx_id: OdxLinkId + odx_id: Optional[OdxLinkId] short_name: str data_object_props: NamedItemList[DataObjectProperty] comparams: NamedItemList[BaseComparam] @@ -95,7 +97,9 @@ class ComparamSubset: description: Optional[str] = None def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: - odxlinks = {self.odx_id: self} + odxlinks: Dict[OdxLinkId, Any] = {} + if self.odx_id is not None: + odxlinks[self.odx_id] = self for dop in self.data_object_props: odxlinks[dop.odx_id] = dop @@ -118,8 +122,10 @@ def _resolve_references(self, odxlinks: OdxLinkDatabase): if self.unit_spec: self.unit_spec._resolve_references(odxlinks) + def read_comparam_from_odx(et_element, doc_frags: List[OdxDocFragment]) -> BaseComparam: odx_id = OdxLinkId.from_et(et_element, doc_frags) + assert odx_id is not None short_name = et_element.findtext("SHORT-NAME") param_class = et_element.attrib.get("PARAM-CLASS") cptype = et_element.attrib.get("CPTYPE") @@ -128,6 +134,7 @@ def read_comparam_from_odx(et_element, doc_frags: List[OdxDocFragment]) -> BaseC comparam: BaseComparam if et_element.tag == "COMPARAM": dop_ref = OdxLinkRef.from_et(et_element.find("DATA-OBJECT-PROP-REF"), doc_frags) + assert dop_ref is not None comparam = Comparam( odx_id=odx_id, short_name=short_name, diff --git a/odxtools/database.py b/odxtools/database.py index 5170144a..a1ff6d09 100644 --- a/odxtools/database.py +++ b/odxtools/database.py @@ -28,6 +28,8 @@ def __init__(self, if pdx_zip is None and odx_d_file_name is None: # create an empty database object + self._diag_layer_containers = NamedItemList(short_name_as_id, []) + self._comparam_subsets = NamedItemList(short_name_as_id, []) return if pdx_zip is not None and odx_d_file_name is not None: @@ -47,7 +49,7 @@ def __init__(self, documents.append(root) elif odx_d_file_name is not None: - documents.append(ElementTree.parse(odx_d_file_name)) + documents.append(ElementTree.parse(odx_d_file_name).getroot()) dlcs: List[DiagLayerContainer] = [] comparam_subsets: List[ComparamSubset] = [] From 38f88aaf5a4566b1d7628285dda5fa8c3abf5c7f Mon Sep 17 00:00:00 2001 From: Ayoub Kaanich Date: Sun, 13 Nov 2022 16:39:23 +0100 Subject: [PATCH 03/10] Support ODX 2.0 --- odxtools/comparam_subset.py | 7 +++++-- odxtools/database.py | 22 ++++++++++++++++++---- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/odxtools/comparam_subset.py b/odxtools/comparam_subset.py index 20a1ede6..a4f0da7c 100644 --- a/odxtools/comparam_subset.py +++ b/odxtools/comparam_subset.py @@ -2,6 +2,7 @@ from dataclasses import dataclass, field from typing import Any, Dict, List, Literal, Optional +from xml.etree.ElementTree import Element from odxtools.dataobjectproperty import DataObjectProperty, read_data_object_property_from_odx from odxtools.nameditemlist import NamedItemList @@ -172,10 +173,12 @@ def read_comparam_from_odx(et_element, doc_frags: List[OdxDocFragment]) -> BaseC return comparam -def read_comparam_subset_from_odx(et_element) -> ComparamSubset: +def read_comparam_subset_from_odx(et_element: Element) -> ComparamSubset: short_name = et_element.findtext("SHORT-NAME") - doc_frags = [OdxDocFragment(short_name, "COMPARAM-SUBSET")] + assert short_name is not None + + doc_frags = [OdxDocFragment(short_name, str(et_element.tag))] odx_id = OdxLinkId.from_et(et_element, doc_frags) long_name = et_element.findtext("LONG-NAME") description = read_description_from_odx(et_element.find("DESC")) diff --git a/odxtools/database.py b/odxtools/database.py index a1ff6d09..c53c0932 100644 --- a/odxtools/database.py +++ b/odxtools/database.py @@ -4,6 +4,7 @@ from pathlib import Path from typing import List, Optional from xml.etree import ElementTree +from xml.etree.ElementTree import Element from itertools import chain from zipfile import ZipFile @@ -14,6 +15,8 @@ from .globals import logger from .nameditemlist import NamedItemList +def version(v: str): + return tuple(map(int, (v.split(".")))) class Database: """This class internalizes the diagnostic database for various ECUs @@ -35,7 +38,7 @@ def __init__(self, if pdx_zip is not None and odx_d_file_name is not None: raise TypeError("The 'pdx_zip' and 'odx_d_file_name' parameters are mutually exclusive") - documents = [] + documents: List[Element] = [] if pdx_zip is not None: names = list(pdx_zip.namelist()) names.sort() @@ -54,15 +57,26 @@ def __init__(self, dlcs: List[DiagLayerContainer] = [] comparam_subsets: List[ComparamSubset] = [] for root in documents: + # ODX spec version + model_version = version(root.attrib.get('MODEL-VERSION', '2.0')) dlc = root.find("DIAG-LAYER-CONTAINER") if dlc is not None: dlcs.append(read_diag_layer_container_from_odx( dlc, enable_candela_workarounds=enable_candela_workarounds )) - subset = root.find("COMPARAM-SUBSET") - if subset is not None: - comparam_subsets.append(read_comparam_subset_from_odx(subset)) + # In ODX 2.0 there was only COMPARAM-SPEC + # In ODX 2.2 content of COMPARAM-SPEC was renamed to COMPARAM-SUBSET + # and COMPARAM-SPEC becomes a container for PROT-STACKS + # and a PROT-STACK references a list of COMPARAM-SUBSET + if model_version >= version('2.2'): + subset = root.find("COMPARAM-SUBSET") + if subset is not None: + comparam_subsets.append(read_comparam_subset_from_odx(subset)) + else: + subset = root.find("COMPARAM-SPEC") + if subset is not None: + comparam_subsets.append(read_comparam_subset_from_odx(subset)) self._diag_layer_containers = NamedItemList(short_name_as_id, dlcs) self._diag_layer_containers.sort(key=short_name_as_id) From 242f77b522088c2d2e2356ada7e5386a1f3cf22f Mon Sep 17 00:00:00 2001 From: Ayoub Kaanich Date: Sun, 13 Nov 2022 16:59:50 +0100 Subject: [PATCH 04/10] Update get_communication_parameter --- odxtools/diaglayer.py | 63 ++++++++++++++++--------------------------- 1 file changed, 23 insertions(+), 40 deletions(-) diff --git a/odxtools/diaglayer.py b/odxtools/diaglayer.py index 16598e32..39744ed4 100644 --- a/odxtools/diaglayer.py +++ b/odxtools/diaglayer.py @@ -445,12 +445,12 @@ def decode_response( f"None of the services {possible_services} could parse {response.hex()}.") return decoded_messages - def get_communication_parameter(self, cp_id: str) \ + def get_communication_parameter(self, name: str) \ -> Optional[CommunicationParameterRef]: - cps = [cp for cp in self.communication_parameters if cp.id_ref.ref_id == cp_id] + cps = [cp for cp in self.communication_parameters if cp.short_name == name] if len(cps) > 1: - warnings.warn(f"Communication parameter `{cp_id}` specified more " + warnings.warn(f"Communication parameter `{name}` specified more " f"than once. Using first occurence.", OdxWarning) elif len(cps) == 0: return None @@ -459,50 +459,29 @@ def get_communication_parameter(self, cp_id: str) \ def get_receive_id(self) -> Optional[int]: """CAN ID to which the ECU listens for diagnostic messages""" - com_param = self.get_communication_parameter("ISO_15765_2.CP_UniqueRespIdTable") - + com_param = self.get_communication_parameter("CP_UniqueRespIdTable") if com_param is None: return None - - if self._enable_candela_workarounds: - # assume the parameter order used by CANdela studio. - # note that the parameter ordering actually used - # differs from the one of the COMPARAM fragment - # delivered by CANdela generated PDX files and that - # both are different from the one of the COMPARAM - # fragment included in the MCD2-D standard - try: - return int(com_param.value[2]) - except ValueError: - return None - else: - # assume the parameter order specified by the COMPARAM - # fragment of the ASAM MCD2-D standard. - return int(com_param.value[1]) + value = com_param.value + if not isinstance(value, dict): + return None + + return value.get('CP_CanRespUSDTId', None) def get_send_id(self) -> Optional[int]: """CAN ID to which the ECU sends replies to diagnostic messages""" - com_param = self.get_communication_parameter("ISO_15765_2.CP_UniqueRespIdTable") - + com_param = self.get_communication_parameter("CP_UniqueRespIdTable") if com_param is None: return None + value = com_param.value + if not isinstance(value, dict): + return None - if self._enable_candela_workarounds: - # assume the parameter order used by CANdela studio. - # note that the parameter odering actually used - # differs from the one of the COMPARAM fragment - # delivered by CANdela generated PDX files and that - # both are different from the one of the COMPARAM - # fragment included in the MCD2-D standard - return int(com_param.value[5]) - else: - # assume the parameter order specified by the COMPARAM - # fragment of the ASAM MCD2-D standard. - return int(com_param.value[4]) + return value.get('CP_CanPhysReqId', None) def get_can_func_req_id(self) -> Optional[int]: """CAN Functional Request Id.""" - com_param = self.get_communication_parameter("ISO_15765_2.CP_CanFuncReqId") + com_param = self.get_communication_parameter("CP_CanFuncReqId") if com_param is None: return None @@ -511,23 +490,27 @@ def get_can_func_req_id(self) -> Optional[int]: def get_logical_doip_address(self) -> Optional[int]: """The logical DoIP address of the ECU.""" - com_param = self.get_communication_parameter("ISO_13400_2_DIS_2015.CP_UniqueRespIdTable") + com_param = self.get_communication_parameter("CP_UniqueRespIdTable") if com_param is None: return None - return int(com_param.value[0]) + value = com_param.value + if not isinstance(value, dict): + return None + + return value.get('CP_CanPhysReqExtAddr', None) def get_tester_present_time(self) -> Optional[float]: """Timeout on inactivity in seconds. - This is defined by the communication parameter "ISO_15765_3.CP_TesterPresentTime". + This is defined by the communication parameter "CP_TesterPresentTime". If the variant does not define this parameter, the default value 3.0 is returned. Description of the comparam: "Time between a response and the next subsequent tester present message (if no other request is sent to this ECU) in case of physically addressed requests." """ - com_param = self.get_communication_parameter("ISO_15765_3.CP_TesterPresentTime") + com_param = self.get_communication_parameter("CP_TesterPresentTime") if com_param is None: return 3.0 # default specified by the standard From ba0ded8d5661e8dfcf0d7a36dd80e4cfa4b25f79 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 15 Nov 2022 14:33:53 +0100 Subject: [PATCH 05/10] somersaultecu: fix the complaints about the communication parameters instead of defining them from scratch, we simply read them from the .odx-cs fragments shipped with the ODX standard. Signed-off-by: Andreas Lauser Signed-off-by: Gerrit Ecke Signed-off-by: Alexander Walz --- examples/somersaultecu.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/examples/somersaultecu.py b/examples/somersaultecu.py index 6d107fb4..c3100727 100755 --- a/examples/somersaultecu.py +++ b/examples/somersaultecu.py @@ -6,6 +6,7 @@ from enum import IntEnum from itertools import chain from typing import Any +from xml.etree import ElementTree from odxtools.utils import short_name_as_id from odxtools import PhysicalConstantParameter @@ -34,6 +35,7 @@ import odxtools.uds as uds from odxtools.odxtypes import DataType from odxtools.odxlink import OdxLinkId, OdxLinkRef, OdxDocFragment +from odxtools.comparam_subset import ComparamSubset, read_comparam_subset_from_odx class SomersaultSID(IntEnum): """The Somersault-ECU specific service IDs. @@ -56,9 +58,9 @@ class SomersaultSID(IntEnum): doc_frags = [ OdxDocFragment(dlc_short_name, "CONTAINER") ] # document fragments for communication parameters -cp_dwcan_doc_frags = [ OdxDocFragment("ISO_11898_2_DWCAN", "COMPARAM-SPEC") ] -cp_iso15765_2_doc_frags = [ OdxDocFragment("ISO_15765_2", "COMPARAM-SPEC") ] -cp_iso15765_3_doc_frags = [ OdxDocFragment("ISO_15765_3", "COMPARAM-SPEC") ] +cp_dwcan_doc_frags = [ OdxDocFragment("ISO_11898_2_DWCAN", "COMPARAM-SUBSET") ] +cp_iso15765_2_doc_frags = [ OdxDocFragment("ISO_15765_2", "COMPARAM-SUBSET") ] +cp_iso15765_3_doc_frags = [ OdxDocFragment("ISO_15765_3", "COMPARAM-SUBSET") ] ################## # Base variant of Somersault ECU @@ -1044,7 +1046,7 @@ def extract_constant_bytes(params): # a response is mandatory CommunicationParameterRef( - id_ref=OdxLinkRef("ISO_15765_3.ISO_15765_3.CP_TesterPresentReqRsp", cp_iso15765_3_doc_frags), + id_ref=OdxLinkRef("ISO_15765_3.CP_TesterPresentReqRsp", cp_iso15765_3_doc_frags), value='Response expected'), # positive response to "tester present" @@ -1267,10 +1269,21 @@ def extract_constant_bytes(params): ecu_variants=[somersault_lazy_diaglayer, somersault_assiduous_diaglayer] ) +# read the communication parameters +comparam_subsets = [] +for odx_cs_filename in ("ISO_11898_2_DWCAN.odx-cs", + "ISO_11898_3_DWFTCAN.odx-cs", + "ISO_15765_2.odx-cs", + "ISO_15765_3_CPSS.odx-cs"): + odx_cs_root = ElementTree.parse(odx_cs_filename).getroot() + subset = odx_cs_root.find("COMPARAM-SUBSET") + if subset is not None: + comparam_subsets.append(read_comparam_subset_from_odx(subset)) + # create a database object database = Database() -database.diag_layer_containers = NamedItemList(short_name_as_id, - [somersault_dlc]) +database._diag_layer_containers = NamedItemList(short_name_as_id, [somersault_dlc]) +database._comparam_subsets = NamedItemList(short_name_as_id, comparam_subsets) # Create ID mapping and resolve references database.finalize_init() From 7afe63c64d33ebc078a6d6f8c8dd6b67f31fe592 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 15 Nov 2022 14:36:37 +0100 Subject: [PATCH 06/10] units: refrain from mandating the SI scaling plus the unit spec if one is defined ... some of the comparam fragments shipped with the ODX standard only specify a unit without any scaling. Assuming that these files are correct, we should not complain about this either? Signed-off-by: Andreas Lauser Signed-off-by: Gerrit Ecke Signed-off-by: Alexander Walz --- odxtools/units.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/odxtools/units.py b/odxtools/units.py index 30ab54d3..4fe28743 100644 --- a/odxtools/units.py +++ b/odxtools/units.py @@ -114,14 +114,6 @@ class Unit: def __post_init__(self): self._physical_dimension = None - if self.factor_si_to_unit is not None or self.offset_si_to_unit is not None or self.physical_dimension_ref is not None: - if self.factor_si_to_unit is not None and self.offset_si_to_unit is not None and self.physical_dimension_ref is not None: - warnings.warn( - f"Error 54: If one of factor_si_to_unit, offset_si_to_unit and physical_dimension_ref is defined," - f" all of them must be defined: {self.factor_si_to_unit} and {self.offset_si_to_unit} and {self.physical_dimension_ref}", - OdxWarning - ) - @property def physical_dimension(self) -> PhysicalDimension: return self._physical_dimension From 96ff67060dac1d6158bef88c146b36702921f014 Mon Sep 17 00:00:00 2001 From: Ayoub Kaanich Date: Tue, 15 Nov 2022 16:35:08 +0100 Subject: [PATCH 07/10] Update odxtools/diaglayer.py --- odxtools/diaglayer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/odxtools/diaglayer.py b/odxtools/diaglayer.py index 39744ed4..1c1972c6 100644 --- a/odxtools/diaglayer.py +++ b/odxtools/diaglayer.py @@ -466,7 +466,7 @@ def get_receive_id(self) -> Optional[int]: if not isinstance(value, dict): return None - return value.get('CP_CanRespUSDTId', None) + return value.get('CP_CanPhysReqId', None) def get_send_id(self) -> Optional[int]: """CAN ID to which the ECU sends replies to diagnostic messages""" From d61a92d80c58f4693fc02e27e22342f8b54840fb Mon Sep 17 00:00:00 2001 From: Ayoub Kaanich Date: Tue, 15 Nov 2022 16:35:59 +0100 Subject: [PATCH 08/10] Update odxtools/diaglayer.py --- odxtools/diaglayer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/odxtools/diaglayer.py b/odxtools/diaglayer.py index 1c1972c6..a32b52a8 100644 --- a/odxtools/diaglayer.py +++ b/odxtools/diaglayer.py @@ -477,7 +477,7 @@ def get_send_id(self) -> Optional[int]: if not isinstance(value, dict): return None - return value.get('CP_CanPhysReqId', None) + return value.get('CP_CanRespUSDTId', None) def get_can_func_req_id(self) -> Optional[int]: """CAN Functional Request Id.""" From 77cd4abe3e4124b2c1368dcf49f3cab269b7993b Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 15 Nov 2022 17:22:43 +0100 Subject: [PATCH 09/10] somersaultecu: load the odx-cs files correctly thanks to [at]kayoub5 for the catch. Signed-off-by: Andreas Lauser Signed-off-by: Gerrit Ecke Signed-off-by: Alexander Walz --- examples/somersaultecu.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/examples/somersaultecu.py b/examples/somersaultecu.py index c3100727..67e20c82 100755 --- a/examples/somersaultecu.py +++ b/examples/somersaultecu.py @@ -7,6 +7,8 @@ from itertools import chain from typing import Any from xml.etree import ElementTree +import pathlib +import odxtools from odxtools.utils import short_name_as_id from odxtools import PhysicalConstantParameter @@ -1271,11 +1273,12 @@ def extract_constant_bytes(params): # read the communication parameters comparam_subsets = [] +odx_cs_dir = pathlib.Path(odxtools.__file__).parent / "pdx_stub" for odx_cs_filename in ("ISO_11898_2_DWCAN.odx-cs", "ISO_11898_3_DWFTCAN.odx-cs", "ISO_15765_2.odx-cs", "ISO_15765_3_CPSS.odx-cs"): - odx_cs_root = ElementTree.parse(odx_cs_filename).getroot() + odx_cs_root = ElementTree.parse(odx_cs_dir/odx_cs_filename).getroot() subset = odx_cs_root.find("COMPARAM-SUBSET") if subset is not None: comparam_subsets.append(read_comparam_subset_from_odx(subset)) From 65ad963ab25606a3b0875066b61db79d20bbcc16 Mon Sep 17 00:00:00 2001 From: Ayoub Kaanich Date: Tue, 15 Nov 2022 19:18:56 +0100 Subject: [PATCH 10/10] Apply suggestions from code review --- odxtools/units.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/odxtools/units.py b/odxtools/units.py index 4fe28743..5c8f872c 100644 --- a/odxtools/units.py +++ b/odxtools/units.py @@ -3,13 +3,11 @@ from dataclasses import dataclass, field from typing import Dict, List, Any, Literal, Optional, Union -import warnings from .utils import short_name_as_id from .nameditemlist import NamedItemList from .utils import read_description_from_odx from .odxlink import OdxLinkRef, OdxLinkId, OdxLinkDatabase, OdxDocFragment -from .exceptions import OdxWarning UnitGroupCategory = Literal["COUNTRY", "EQUIV-UNITS"]