Skip to content

Commit

Permalink
Merge pull request #92 from kayoub5/feature/comparam_subset
Browse files Browse the repository at this point in the history
Support for COMPARAM-SUBSET
  • Loading branch information
andlaus committed Nov 16, 2022
2 parents fd2414a + 65ad963 commit a4f7dc5
Show file tree
Hide file tree
Showing 6 changed files with 374 additions and 118 deletions.
28 changes: 22 additions & 6 deletions examples/somersaultecu.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from enum import IntEnum
from itertools import chain
from typing import Any
from xml.etree import ElementTree
import pathlib
import odxtools

from odxtools.utils import short_name_as_id
from odxtools import PhysicalConstantParameter
Expand Down Expand Up @@ -34,6 +37,7 @@
import odxtools.uds as uds
from odxtools.odxtypes import DataType
from odxtools.odxlink import OdxLinkId, OdxLinkRef, OdxDocFragment
from odxtools.comparam_subset import ComparamSubset, read_comparam_subset_from_odx

class SomersaultSID(IntEnum):
"""The Somersault-ECU specific service IDs.
Expand All @@ -56,9 +60,9 @@ class SomersaultSID(IntEnum):
doc_frags = [ OdxDocFragment(dlc_short_name, "CONTAINER") ]

# document fragments for communication parameters
cp_dwcan_doc_frags = [ OdxDocFragment("ISO_11898_2_DWCAN", "COMPARAM-SPEC") ]
cp_iso15765_2_doc_frags = [ OdxDocFragment("ISO_15765_2", "COMPARAM-SPEC") ]
cp_iso15765_3_doc_frags = [ OdxDocFragment("ISO_15765_3", "COMPARAM-SPEC") ]
cp_dwcan_doc_frags = [ OdxDocFragment("ISO_11898_2_DWCAN", "COMPARAM-SUBSET") ]
cp_iso15765_2_doc_frags = [ OdxDocFragment("ISO_15765_2", "COMPARAM-SUBSET") ]
cp_iso15765_3_doc_frags = [ OdxDocFragment("ISO_15765_3", "COMPARAM-SUBSET") ]

##################
# Base variant of Somersault ECU
Expand Down Expand Up @@ -1044,7 +1048,7 @@ def extract_constant_bytes(params):

# a response is mandatory
CommunicationParameterRef(
id_ref=OdxLinkRef("ISO_15765_3.ISO_15765_3.CP_TesterPresentReqRsp", cp_iso15765_3_doc_frags),
id_ref=OdxLinkRef("ISO_15765_3.CP_TesterPresentReqRsp", cp_iso15765_3_doc_frags),
value='Response expected'),

# positive response to "tester present"
Expand Down Expand Up @@ -1267,10 +1271,22 @@ def extract_constant_bytes(params):
ecu_variants=[somersault_lazy_diaglayer, somersault_assiduous_diaglayer]
)

# read the communication parameters
comparam_subsets = []
odx_cs_dir = pathlib.Path(odxtools.__file__).parent / "pdx_stub"
for odx_cs_filename in ("ISO_11898_2_DWCAN.odx-cs",
"ISO_11898_3_DWFTCAN.odx-cs",
"ISO_15765_2.odx-cs",
"ISO_15765_3_CPSS.odx-cs"):
odx_cs_root = ElementTree.parse(odx_cs_dir/odx_cs_filename).getroot()
subset = odx_cs_root.find("COMPARAM-SUBSET")
if subset is not None:
comparam_subsets.append(read_comparam_subset_from_odx(subset))

# create a database object
database = Database()
database.diag_layer_containers = NamedItemList(short_name_as_id,
[somersault_dlc])
database._diag_layer_containers = NamedItemList(short_name_as_id, [somersault_dlc])
database._comparam_subsets = NamedItemList(short_name_as_id, comparam_subsets)

# Create ID mapping and resolve references
database.finalize_init()
Expand Down
70 changes: 46 additions & 24 deletions odxtools/communicationparameter.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,34 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2022 MBition GmbH

from typing import Optional, List
from typing import Optional, List, TYPE_CHECKING
import warnings

from .odxlink import OdxLinkRef, OdxDocFragment
from odxtools.exceptions import OdxWarning

from .odxlink import OdxLinkDatabase, OdxLinkRef, OdxDocFragment
from .utils import read_description_from_odx
from .comparam_subset import BaseComparam, Comparam, ComplexComparam, read_complex_value_from_odx


def _get_comparam_value(comparam: BaseComparam, value):
if not comparam:
return value
if value is None:
value = comparam.physical_default_value

if isinstance(comparam, Comparam):
return comparam.dop.physical_type.base_data_type.from_string(value)

if isinstance(comparam, ComplexComparam) and isinstance(value, list):
value = value + [None] * (len(comparam.comparams) - len(value))
result = dict()
for i, cp in enumerate(comparam.comparams):
result[cp.short_name] = _get_comparam_value(cp, value[i])
return result

return value


class CommunicationParameterRef:

Expand All @@ -14,41 +38,39 @@ def __init__(self,
description: Optional[str] = None,
protocol_sn_ref: Optional[str] = None,
prot_stack_sn_ref: Optional[str] = None):
self.value = value
self._value = value
self.id_ref = id_ref
self.description = description
self.protocol_sn_ref = protocol_sn_ref
self.prot_stack_sn_ref = prot_stack_sn_ref
self.comparam: Optional[BaseComparam] = None

def __repr__(self) -> str:
val = self.value
if not isinstance(self.value, list):
if isinstance(val, str):
val = f"'{val}'"

return f"CommunicationParameter('{self.id_ref}', value={val})"
return f"CommunicationParameter('{self.short_name}', value={val})"

def __str__(self) -> str:
val = self.value
if not isinstance(self.value, list):
val = f"'{val}'"
return self.__repr__()

return f"CommunicationParameter('{self.id_ref}', value={val})"
def _resolve_references(self, odxlinks: OdxLinkDatabase):
# Temporary lenient until tests are updated
self.comparam = odxlinks.resolve_lenient(self.id_ref)
if not self.comparam:
warnings.warn(f"Could not resolve COMPARAM '{self.id_ref}'", OdxWarning)

def _python_name(self) -> str:
# ODXLINK IDs allow dots and hyphens, but python identifiers
# do not. since _python_name() must return the latter, we have
# to replace these characters...
@property
def short_name(self):
if self.comparam:
return self.comparam.short_name
# ODXLINK IDs allow dots and hyphens, but python identifiers do not.
# This should not happen anyway in a correct PDX
return self.id_ref.ref_id.replace(".", "__").replace("-", "_")


def _read_complex_value_from_odx(et_element):
result = []
for el in et_element.findall("*"):
if el.tag == "SIMPLE-VALUE":
result.append('' if el.text is None else el.text)
else:
result.append(_read_complex_value_from_odx(el))
return result
@property
def value(self):
return _get_comparam_value(self.comparam, self._value)


def read_communication_param_ref_from_odx(et_element, doc_frags: List[OdxDocFragment]):
Expand All @@ -62,7 +84,7 @@ def read_communication_param_ref_from_odx(et_element, doc_frags: List[OdxDocFrag
elif et_element.find("SIMPLE-VALUE") is not None:
value = et_element.findtext("SIMPLE-VALUE")
else:
value = _read_complex_value_from_odx(et_element.find("COMPLEX-VALUE"))
value = read_complex_value_from_odx(et_element.find("COMPLEX-VALUE"))

description = read_description_from_odx(et_element.find("DESC"))
protocol_sn_ref = et_element.findtext("PROTOCOL-SNREF")
Expand Down
212 changes: 212 additions & 0 deletions odxtools/comparam_subset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@


from dataclasses import dataclass, field
from typing import Any, Dict, List, Literal, Optional
from xml.etree.ElementTree import Element

from odxtools.dataobjectproperty import DataObjectProperty, read_data_object_property_from_odx
from odxtools.nameditemlist import NamedItemList
from odxtools.odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId, OdxLinkRef
from odxtools.units import UnitSpec, read_unit_spec_from_odx
from odxtools.utils import read_description_from_odx, short_name_as_id


StandardizationLevel = Literal[
"STANDARD",
"OEM-SPECIFIC",
"OPTIONAL",
"OEM-OPTIONAL",
]

Usage = Literal[
"ECU-SOFTWARE",
"ECU-COMM",
"APPLICATION",
"TESTER",
]


def read_complex_value_from_odx(et_element):
result = []
for el in et_element.findall("*"):
if el.tag == "SIMPLE-VALUE":
result.append('' if el.text is None else el.text)
else:
result.append(read_complex_value_from_odx(el))
return result


@dataclass
class BaseComparam:
odx_id: OdxLinkId
short_name: str
long_name: Optional[str] = field(default=None, init=False)
description: Optional[str] = field(default=None, init=False)
physical_default_value: Any = field(default=None, init=False)
param_class: str
cptype: StandardizationLevel
cpusage: Usage
display_level: Optional[int] = field(default=None, init=False)

def _resolve_references(self, odxlinks: OdxLinkDatabase):
pass

def _build_odxlinks(self) -> Dict[OdxLinkId, Any]:
return {self.odx_id: self}


@dataclass()
class ComplexComparam(BaseComparam):
comparams: NamedItemList[BaseComparam]
allow_multiple_values: bool = False

def _resolve_references(self, odxlinks: OdxLinkDatabase):
for comparam in self.comparams:
comparam._resolve_references(odxlinks)

def _build_odxlinks(self) -> Dict[OdxLinkId, Any]:
odxlinks = super()._build_odxlinks()
for comparam in self.comparams:
odxlinks.update(comparam._build_odxlinks())
return odxlinks


@dataclass()
class Comparam(BaseComparam):
dop_ref: OdxLinkRef
_dop: Optional[DataObjectProperty] = field(default=None, init=False)

@property
def dop(self) -> DataObjectProperty:
"""The data object property describing this parameter."""
assert self._dop is not None
return self._dop

def _resolve_references(self, odxlinks: OdxLinkDatabase):
"""Resolves the reference to the dop"""
self._dop = odxlinks.resolve(self.dop_ref)


@dataclass()
class ComparamSubset:
odx_id: Optional[OdxLinkId]
short_name: str
data_object_props: NamedItemList[DataObjectProperty]
comparams: NamedItemList[BaseComparam]
unit_spec: Optional[UnitSpec] = None
long_name: Optional[str] = None
description: Optional[str] = None

def _build_odxlinks(self) -> Dict[OdxLinkId, Any]:
odxlinks: Dict[OdxLinkId, Any] = {}
if self.odx_id is not None:
odxlinks[self.odx_id] = self

for dop in self.data_object_props:
odxlinks[dop.odx_id] = dop

for comparam in self.comparams:
odxlinks.update(comparam._build_odxlinks())

if self.unit_spec:
odxlinks.update(self.unit_spec._build_odxlinks())

return odxlinks

def _resolve_references(self, odxlinks: OdxLinkDatabase):
for dop in self.data_object_props:
dop._resolve_references(odxlinks)

for comparam in self.comparams:
comparam._resolve_references(odxlinks)

if self.unit_spec:
self.unit_spec._resolve_references(odxlinks)


def read_comparam_from_odx(et_element, doc_frags: List[OdxDocFragment]) -> BaseComparam:
odx_id = OdxLinkId.from_et(et_element, doc_frags)
assert odx_id is not None
short_name = et_element.findtext("SHORT-NAME")
param_class = et_element.attrib.get("PARAM-CLASS")
cptype = et_element.attrib.get("CPTYPE")
cpusage = et_element.attrib.get("CPUSAGE")

comparam: BaseComparam
if et_element.tag == "COMPARAM":
dop_ref = OdxLinkRef.from_et(et_element.find("DATA-OBJECT-PROP-REF"), doc_frags)
assert dop_ref is not None
comparam = Comparam(
odx_id=odx_id,
short_name=short_name,
param_class=param_class,
cptype=cptype,
cpusage=cpusage,
dop_ref=dop_ref,
)
comparam.physical_default_value = et_element.findtext("PHYSICAL-DEFAULT-VALUE")
elif et_element.tag == "COMPLEX-COMPARAM":
comparams = [
read_comparam_from_odx(el, doc_frags)
for el in et_element if el.tag in ('COMPARAM', 'COMPLEX-COMPARAM')
]
comparam = ComplexComparam(
odx_id=odx_id,
short_name=short_name,
param_class=param_class,
cptype=cptype,
cpusage=cpusage,
comparams=NamedItemList(short_name_as_id, comparams),
)
complex_values = et_element.iterfind("COMPLEX-PHYSICAL-DEFAULT-VALUE/COMPLEX-VALUES/COMPLEX-VALUE")
comparam.physical_default_value = list(map(read_complex_value_from_odx, complex_values))
comparam.allow_multiple_values = et_element.get("ALLOW-MULTIPLE-VALUES", "false") == 'true'
else:
assert False, f"Failed to parse COMPARAM {short_name}"

comparam.long_name = et_element.findtext("LONG-NAME")
comparam.description = read_description_from_odx(et_element.find("DESC"))
display_level = et_element.attrib.get("DISPLAY-LEVEL", None)
if display_level is not None:
comparam.display_level = int(display_level)

return comparam


def read_comparam_subset_from_odx(et_element: Element) -> ComparamSubset:

short_name = et_element.findtext("SHORT-NAME")
assert short_name is not None

doc_frags = [OdxDocFragment(short_name, str(et_element.tag))]
odx_id = OdxLinkId.from_et(et_element, doc_frags)
long_name = et_element.findtext("LONG-NAME")
description = read_description_from_odx(et_element.find("DESC"))

data_object_props = [
read_data_object_property_from_odx(el, doc_frags)
for el in et_element.iterfind("DATA-OBJECT-PROPS/DATA-OBJECT-PROP")
]
comparams: List[BaseComparam] = []
comparams += [
read_comparam_from_odx(el, doc_frags)
for el in et_element.iterfind("COMPARAMS/COMPARAM")
]
comparams += [
read_comparam_from_odx(el, doc_frags)
for el in et_element.iterfind("COMPLEX-COMPARAMS/COMPLEX-COMPARAM")
]
if et_element.find("UNIT-SPEC") is not None:
unit_spec = read_unit_spec_from_odx(et_element.find("UNIT-SPEC"), doc_frags)
else:
unit_spec = None

return ComparamSubset(
odx_id=odx_id,
short_name=short_name,
long_name=long_name,
description=description,
data_object_props=NamedItemList(short_name_as_id, data_object_props),
comparams=NamedItemList(short_name_as_id, comparams),
unit_spec=unit_spec,
)
Loading

0 comments on commit a4f7dc5

Please sign in to comment.