Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for COMPARAM-SUBSET #92

Merged
merged 12 commits into from
Nov 16, 2022
28 changes: 22 additions & 6 deletions examples/somersaultecu.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from enum import IntEnum
from itertools import chain
from typing import Any
from xml.etree import ElementTree
import pathlib
import odxtools

from odxtools.utils import short_name_as_id
from odxtools import PhysicalConstantParameter
Expand Down Expand Up @@ -34,6 +37,7 @@
import odxtools.uds as uds
from odxtools.odxtypes import DataType
from odxtools.odxlink import OdxLinkId, OdxLinkRef, OdxDocFragment
from odxtools.comparam_subset import ComparamSubset, read_comparam_subset_from_odx

class SomersaultSID(IntEnum):
"""The Somersault-ECU specific service IDs.
Expand All @@ -56,9 +60,9 @@ class SomersaultSID(IntEnum):
doc_frags = [ OdxDocFragment(dlc_short_name, "CONTAINER") ]

# document fragments for communication parameters
cp_dwcan_doc_frags = [ OdxDocFragment("ISO_11898_2_DWCAN", "COMPARAM-SPEC") ]
cp_iso15765_2_doc_frags = [ OdxDocFragment("ISO_15765_2", "COMPARAM-SPEC") ]
cp_iso15765_3_doc_frags = [ OdxDocFragment("ISO_15765_3", "COMPARAM-SPEC") ]
cp_dwcan_doc_frags = [ OdxDocFragment("ISO_11898_2_DWCAN", "COMPARAM-SUBSET") ]
cp_iso15765_2_doc_frags = [ OdxDocFragment("ISO_15765_2", "COMPARAM-SUBSET") ]
cp_iso15765_3_doc_frags = [ OdxDocFragment("ISO_15765_3", "COMPARAM-SUBSET") ]

##################
# Base variant of Somersault ECU
Expand Down Expand Up @@ -1044,7 +1048,7 @@ def extract_constant_bytes(params):

# a response is mandatory
CommunicationParameterRef(
id_ref=OdxLinkRef("ISO_15765_3.ISO_15765_3.CP_TesterPresentReqRsp", cp_iso15765_3_doc_frags),
id_ref=OdxLinkRef("ISO_15765_3.CP_TesterPresentReqRsp", cp_iso15765_3_doc_frags),
value='Response expected'),

# positive response to "tester present"
Expand Down Expand Up @@ -1267,10 +1271,22 @@ def extract_constant_bytes(params):
ecu_variants=[somersault_lazy_diaglayer, somersault_assiduous_diaglayer]
)

# read the communication parameters
comparam_subsets = []
odx_cs_dir = pathlib.Path(odxtools.__file__).parent / "pdx_stub"
for odx_cs_filename in ("ISO_11898_2_DWCAN.odx-cs",
"ISO_11898_3_DWFTCAN.odx-cs",
"ISO_15765_2.odx-cs",
"ISO_15765_3_CPSS.odx-cs"):
odx_cs_root = ElementTree.parse(odx_cs_dir/odx_cs_filename).getroot()
subset = odx_cs_root.find("COMPARAM-SUBSET")
if subset is not None:
comparam_subsets.append(read_comparam_subset_from_odx(subset))

# create a database object
database = Database()
database.diag_layer_containers = NamedItemList(short_name_as_id,
[somersault_dlc])
database._diag_layer_containers = NamedItemList(short_name_as_id, [somersault_dlc])
database._comparam_subsets = NamedItemList(short_name_as_id, comparam_subsets)

# Create ID mapping and resolve references
database.finalize_init()
Expand Down
70 changes: 46 additions & 24 deletions odxtools/communicationparameter.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,34 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2022 MBition GmbH

from typing import Optional, List
from typing import Optional, List, TYPE_CHECKING
import warnings

from .odxlink import OdxLinkRef, OdxDocFragment
from odxtools.exceptions import OdxWarning

from .odxlink import OdxLinkDatabase, OdxLinkRef, OdxDocFragment
from .utils import read_description_from_odx
from .comparam_subset import BaseComparam, Comparam, ComplexComparam, read_complex_value_from_odx


def _get_comparam_value(comparam: BaseComparam, value):
if not comparam:
return value
if value is None:
value = comparam.physical_default_value

if isinstance(comparam, Comparam):
return comparam.dop.physical_type.base_data_type.from_string(value)

if isinstance(comparam, ComplexComparam) and isinstance(value, list):
value = value + [None] * (len(comparam.comparams) - len(value))
result = dict()
for i, cp in enumerate(comparam.comparams):
result[cp.short_name] = _get_comparam_value(cp, value[i])
return result
kayoub5 marked this conversation as resolved.
Show resolved Hide resolved

return value


class CommunicationParameterRef:

Expand All @@ -14,41 +38,39 @@ def __init__(self,
description: Optional[str] = None,
protocol_sn_ref: Optional[str] = None,
prot_stack_sn_ref: Optional[str] = None):
self.value = value
self._value = value
self.id_ref = id_ref
self.description = description
self.protocol_sn_ref = protocol_sn_ref
self.prot_stack_sn_ref = prot_stack_sn_ref
self.comparam: Optional[BaseComparam] = None

def __repr__(self) -> str:
val = self.value
if not isinstance(self.value, list):
if isinstance(val, str):
val = f"'{val}'"

return f"CommunicationParameter('{self.id_ref}', value={val})"
return f"CommunicationParameter('{self.short_name}', value={val})"

def __str__(self) -> str:
val = self.value
if not isinstance(self.value, list):
val = f"'{val}'"
return self.__repr__()

return f"CommunicationParameter('{self.id_ref}', value={val})"
def _resolve_references(self, odxlinks: OdxLinkDatabase):
# Temporary lenient until tests are updated
self.comparam = odxlinks.resolve_lenient(self.id_ref)
if not self.comparam:
warnings.warn(f"Could not resolve COMPARAM '{self.id_ref}'", OdxWarning)
kayoub5 marked this conversation as resolved.
Show resolved Hide resolved

def _python_name(self) -> str:
# ODXLINK IDs allow dots and hyphens, but python identifiers
# do not. since _python_name() must return the latter, we have
# to replace these characters...
@property
def short_name(self):
if self.comparam:
return self.comparam.short_name
# ODXLINK IDs allow dots and hyphens, but python identifiers do not.
# This should not happen anyway in a correct PDX
return self.id_ref.ref_id.replace(".", "__").replace("-", "_")


def _read_complex_value_from_odx(et_element):
result = []
for el in et_element.findall("*"):
if el.tag == "SIMPLE-VALUE":
result.append('' if el.text is None else el.text)
else:
result.append(_read_complex_value_from_odx(el))
return result
@property
def value(self):
return _get_comparam_value(self.comparam, self._value)


def read_communication_param_ref_from_odx(et_element, doc_frags: List[OdxDocFragment]):
Expand All @@ -62,7 +84,7 @@ def read_communication_param_ref_from_odx(et_element, doc_frags: List[OdxDocFrag
elif et_element.find("SIMPLE-VALUE") is not None:
value = et_element.findtext("SIMPLE-VALUE")
else:
value = _read_complex_value_from_odx(et_element.find("COMPLEX-VALUE"))
value = read_complex_value_from_odx(et_element.find("COMPLEX-VALUE"))

description = read_description_from_odx(et_element.find("DESC"))
protocol_sn_ref = et_element.findtext("PROTOCOL-SNREF")
Expand Down
212 changes: 212 additions & 0 deletions odxtools/comparam_subset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@


from dataclasses import dataclass, field
from typing import Any, Dict, List, Literal, Optional
from xml.etree.ElementTree import Element

from odxtools.dataobjectproperty import DataObjectProperty, read_data_object_property_from_odx
from odxtools.nameditemlist import NamedItemList
from odxtools.odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId, OdxLinkRef
from odxtools.units import UnitSpec, read_unit_spec_from_odx
from odxtools.utils import read_description_from_odx, short_name_as_id


StandardizationLevel = Literal[
"STANDARD",
"OEM-SPECIFIC",
"OPTIONAL",
"OEM-OPTIONAL",
]

Usage = Literal[
"ECU-SOFTWARE",
"ECU-COMM",
"APPLICATION",
"TESTER",
]


def read_complex_value_from_odx(et_element):
result = []
for el in et_element.findall("*"):
if el.tag == "SIMPLE-VALUE":
result.append('' if el.text is None else el.text)
else:
result.append(read_complex_value_from_odx(el))
return result


@dataclass
class BaseComparam:
odx_id: OdxLinkId
short_name: str
long_name: Optional[str] = field(default=None, init=False)
description: Optional[str] = field(default=None, init=False)
physical_default_value: Any = field(default=None, init=False)
param_class: str
cptype: StandardizationLevel
cpusage: Usage
display_level: Optional[int] = field(default=None, init=False)

def _resolve_references(self, odxlinks: OdxLinkDatabase):
pass

def _build_odxlinks(self) -> Dict[OdxLinkId, Any]:
return {self.odx_id: self}


@dataclass()
class ComplexComparam(BaseComparam):
comparams: NamedItemList[BaseComparam]
allow_multiple_values: bool = False

def _resolve_references(self, odxlinks: OdxLinkDatabase):
for comparam in self.comparams:
comparam._resolve_references(odxlinks)

def _build_odxlinks(self) -> Dict[OdxLinkId, Any]:
odxlinks = super()._build_odxlinks()
for comparam in self.comparams:
odxlinks.update(comparam._build_odxlinks())
return odxlinks


@dataclass()
class Comparam(BaseComparam):
dop_ref: OdxLinkRef
_dop: Optional[DataObjectProperty] = field(default=None, init=False)

@property
def dop(self) -> DataObjectProperty:
"""The data object property describing this parameter."""
assert self._dop is not None
return self._dop

def _resolve_references(self, odxlinks: OdxLinkDatabase):
"""Resolves the reference to the dop"""
self._dop = odxlinks.resolve(self.dop_ref)


@dataclass()
class ComparamSubset:
odx_id: Optional[OdxLinkId]
short_name: str
data_object_props: NamedItemList[DataObjectProperty]
comparams: NamedItemList[BaseComparam]
unit_spec: Optional[UnitSpec] = None
long_name: Optional[str] = None
description: Optional[str] = None

def _build_odxlinks(self) -> Dict[OdxLinkId, Any]:
odxlinks: Dict[OdxLinkId, Any] = {}
if self.odx_id is not None:
odxlinks[self.odx_id] = self

for dop in self.data_object_props:
odxlinks[dop.odx_id] = dop

for comparam in self.comparams:
odxlinks.update(comparam._build_odxlinks())

if self.unit_spec:
odxlinks.update(self.unit_spec._build_odxlinks())

return odxlinks

def _resolve_references(self, odxlinks: OdxLinkDatabase):
for dop in self.data_object_props:
dop._resolve_references(odxlinks)

for comparam in self.comparams:
comparam._resolve_references(odxlinks)

if self.unit_spec:
self.unit_spec._resolve_references(odxlinks)


def read_comparam_from_odx(et_element, doc_frags: List[OdxDocFragment]) -> BaseComparam:
odx_id = OdxLinkId.from_et(et_element, doc_frags)
assert odx_id is not None
short_name = et_element.findtext("SHORT-NAME")
param_class = et_element.attrib.get("PARAM-CLASS")
cptype = et_element.attrib.get("CPTYPE")
cpusage = et_element.attrib.get("CPUSAGE")

comparam: BaseComparam
if et_element.tag == "COMPARAM":
dop_ref = OdxLinkRef.from_et(et_element.find("DATA-OBJECT-PROP-REF"), doc_frags)
assert dop_ref is not None
comparam = Comparam(
odx_id=odx_id,
short_name=short_name,
param_class=param_class,
cptype=cptype,
cpusage=cpusage,
dop_ref=dop_ref,
)
comparam.physical_default_value = et_element.findtext("PHYSICAL-DEFAULT-VALUE")
elif et_element.tag == "COMPLEX-COMPARAM":
comparams = [
read_comparam_from_odx(el, doc_frags)
for el in et_element if el.tag in ('COMPARAM', 'COMPLEX-COMPARAM')
]
comparam = ComplexComparam(
odx_id=odx_id,
short_name=short_name,
param_class=param_class,
cptype=cptype,
cpusage=cpusage,
comparams=NamedItemList(short_name_as_id, comparams),
)
complex_values = et_element.iterfind("COMPLEX-PHYSICAL-DEFAULT-VALUE/COMPLEX-VALUES/COMPLEX-VALUE")
comparam.physical_default_value = list(map(read_complex_value_from_odx, complex_values))
comparam.allow_multiple_values = et_element.get("ALLOW-MULTIPLE-VALUES", "false") == 'true'
else:
assert False, f"Failed to parse COMPARAM {short_name}"

comparam.long_name = et_element.findtext("LONG-NAME")
comparam.description = read_description_from_odx(et_element.find("DESC"))
display_level = et_element.attrib.get("DISPLAY-LEVEL", None)
if display_level is not None:
comparam.display_level = int(display_level)

return comparam


def read_comparam_subset_from_odx(et_element: Element) -> ComparamSubset:

short_name = et_element.findtext("SHORT-NAME")
assert short_name is not None

doc_frags = [OdxDocFragment(short_name, str(et_element.tag))]
odx_id = OdxLinkId.from_et(et_element, doc_frags)
long_name = et_element.findtext("LONG-NAME")
description = read_description_from_odx(et_element.find("DESC"))

data_object_props = [
read_data_object_property_from_odx(el, doc_frags)
for el in et_element.iterfind("DATA-OBJECT-PROPS/DATA-OBJECT-PROP")
]
comparams: List[BaseComparam] = []
comparams += [
read_comparam_from_odx(el, doc_frags)
for el in et_element.iterfind("COMPARAMS/COMPARAM")
]
comparams += [
read_comparam_from_odx(el, doc_frags)
for el in et_element.iterfind("COMPLEX-COMPARAMS/COMPLEX-COMPARAM")
]
if et_element.find("UNIT-SPEC") is not None:
unit_spec = read_unit_spec_from_odx(et_element.find("UNIT-SPEC"), doc_frags)
else:
unit_spec = None

return ComparamSubset(
odx_id=odx_id,
short_name=short_name,
long_name=long_name,
description=description,
data_object_props=NamedItemList(short_name_as_id, data_object_props),
comparams=NamedItemList(short_name_as_id, comparams),
unit_spec=unit_spec,
)
Loading