From 398ab9596e6379a1d4628266988c1b51fd3e20f8 Mon Sep 17 00:00:00 2001 From: Pier-Yves Lessard Date: Sat, 29 Mar 2025 19:46:56 -0400 Subject: [PATCH 1/2] Added reportWWHOBDDTCWithPermanentStatus + support dtc_class + fix reportWWHOBDDTCByMaskRecord --- doc/source/udsoncan/client.rst | 1 + doc/source/udsoncan/helper_classes.rst | 9 + test/client/test_read_dtc_information.py | 204 +++++++++++++++++++---- udsoncan/client.py | 68 +++++--- udsoncan/common/dtc.py | 93 ++++++++++- udsoncan/services/ReadDTCInformation.py | 70 ++++++-- 6 files changed, 380 insertions(+), 65 deletions(-) diff --git a/doc/source/udsoncan/client.rst b/doc/source/udsoncan/client.rst index 56cc01b..076af14 100755 --- a/doc/source/udsoncan/client.rst +++ b/doc/source/udsoncan/client.rst @@ -438,6 +438,7 @@ Methods by services .. automethod:: udsoncan.client.Client.get_mirrormemory_dtc_by_status_mask .. automethod:: udsoncan.client.Client.get_dtc_by_status_severity_mask .. automethod:: udsoncan.client.Client.get_wwh_obd_dtc_by_status_mask +.. automethod:: udsoncan.client.Client.get_wwh_obd_dtc_with_permanent_status .. automethod:: udsoncan.client.Client.get_number_of_dtc_by_status_mask .. automethod:: udsoncan.client.Client.get_mirrormemory_number_of_dtc_by_status_mask .. automethod:: udsoncan.client.Client.get_number_of_emission_dtc_by_status_mask diff --git a/doc/source/udsoncan/helper_classes.rst b/doc/source/udsoncan/helper_classes.rst index 2ebeef3..44ada29 100755 --- a/doc/source/udsoncan/helper_classes.rst +++ b/doc/source/udsoncan/helper_classes.rst @@ -85,6 +85,15 @@ DTC.Status ----- +.. _DTC_DtcClass: + +DTC.DtcClass +------------ + +.. autoclass:: udsoncan::Dtc.DtcClass + +----- + .. _DTC_Severity: DTC.Severity diff --git a/test/client/test_read_dtc_information.py b/test/client/test_read_dtc_information.py index f7b2fe7..f0978a4 100755 --- a/test/client/test_read_dtc_information.py +++ b/test/client/test_read_dtc_information.py @@ -2584,12 +2584,6 @@ def __init__(self, *args, **kwargs): class TestreportDTCWWHOBDDTCByMaskRecord(ClientServerTest): # Subfn = 0x16 sb = struct.pack('B', 0x42) - badsb = struct.pack('B', 0x42+1) - functional_group_id = 0x1 - status_mask = 0x2 - severity_mask = 0x20 - dtc_class = 0x4 - expected_request_bytes = b'\x19' + sb + bytes([functional_group_id, status_mask, severity_mask | dtc_class]) def assert_no_data_response(self, response): self.assertEqual(len(response.service_data.dtcs), 0) @@ -2597,11 +2591,11 @@ def assert_no_data_response(self, response): def test_no_data(self): request = self.conn.touserqueue.get(timeout=0.2) - self.assertEqual(request, self.expected_request_bytes) - self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0x3, 0x4])) + self.assertEqual(request, b'\x19' + self.sb + bytes([1, 2, 0xA5])) + self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0xA0, 0x4])) def _test_no_data(self): - response = self.udsclient.get_wwh_obd_dtc_by_status_mask(self.functional_group_id, self.status_mask, self.severity_mask, self.dtc_class) + response = self.udsclient.get_wwh_obd_dtc_by_status_mask(functional_group_id=1, status_mask=2, severity_mask=0xA0, dtc_class=5) self.assert_no_data_response(response) def test_functional_group_verification(self): @@ -2609,10 +2603,10 @@ def test_functional_group_verification(self): def _test_functional_group_verification(self): with self.assertRaises(ValueError): - response = self.udsclient.get_wwh_obd_dtc_by_status_mask(None, self.status_mask, self.severity_mask, self.dtc_class) + self.udsclient.get_wwh_obd_dtc_by_status_mask(None, status_mask=2, severity_mask=0xA0, dtc_class=4) with self.assertRaises(ValueError): - response = self.udsclient.get_wwh_obd_dtc_by_status_mask(0xff, self.status_mask, self.severity_mask, self.dtc_class) + self.udsclient.get_wwh_obd_dtc_by_status_mask(0xff, status_mask=2, severity_mask=0xA0, dtc_class=4) def assert_no_data_response_with_severity_class(self, response): self.assertEqual(len(response.service_data.dtcs), 0) @@ -2620,67 +2614,215 @@ def assert_no_data_response_with_severity_class(self, response): def test_no_data_with_severity_class(self): request = self.conn.touserqueue.get(timeout=0.2) - self.assertEqual(request, self.expected_request_bytes) - self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0x3, 0x4])) + self.assertEqual(request, b'\x19' + self.sb + bytes([0x01, 0x02, 0xA7])) + self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0xA0, 0x4])) def _test_no_data_with_severity_class(self): - severity_class = Dtc.Severity() - severity_class.set_byte(self.severity_mask) - response = self.udsclient.get_wwh_obd_dtc_by_status_mask(self.functional_group_id, self.status_mask, severity_class, self.dtc_class) + severity_obj = Dtc.Severity.from_byte(0xA0) + dtc_class_obj = Dtc.DtcClass.from_byte(0x7) + response = self.udsclient.get_wwh_obd_dtc_by_status_mask(functional_group_id=1, status_mask=2, severity_mask=severity_obj, dtc_class=dtc_class_obj) self.assert_no_data_response(response) def assert_single_data_response(self, response): + assert isinstance(response, Response) + assert isinstance(response.service_data, ReadDTCInformation.ResponseData) self.assertEqual(len(response.service_data.dtcs), 1) self.assertEqual(response.service_data.dtc_count, 1) + self.assertIsNotNone(response.service_data.functional_group_id) + self.assertIsNotNone(response.service_data.status_availability) + self.assertIsNotNone(response.service_data.severity_availability) + self.assertIsNotNone(response.service_data.dtc_format) + + self.assertEqual(response.service_data.functional_group_id, 0xAB) + self.assertEqual(response.service_data.status_availability.get_byte_as_int(), 0x7E) + self.assertEqual(response.service_data.severity_availability.get_byte_as_int(), 0xA0) + self.assertEqual(response.service_data.dtc_format, Dtc.Format.SAE_J2012_DA_DTCFormat_04) # 0x04 + dtc = response.service_data.dtcs[0] self.assertEqual(dtc.id, 0x123456) - self.assertEqual(dtc.status.get_byte_as_int(), 0x20) - self.assertEqual(dtc.severity.get_byte_as_int(), 32) + self.assertEqual(dtc.status.get_byte_as_int(), 0x25) + self.assertEqual(dtc.severity.get_byte_as_int(), 0x20) self.assertEqual(len(dtc.extended_data), 0) def test_single_data(self): request = self.conn.touserqueue.get(timeout=0.2) - self.assertEqual(request, self.expected_request_bytes) - self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0x3, 0x4, 0x33, 0x12, 0x34, 0x56, 0x20])) + self.assertEqual(request, b'\x19' + self.sb + bytes([0xAB, 0x02, 0xA7])) + self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0xAB, 0x7E, 0xA0, 0x4, 0x20, 0x12, 0x34, 0x56, 0x25])) def _test_single_data(self): - response = self.udsclient.get_wwh_obd_dtc_by_status_mask(self.functional_group_id, self.status_mask, self.severity_mask, self.dtc_class) + response = self.udsclient.get_wwh_obd_dtc_by_status_mask(functional_group_id=0xAB, status_mask=2, severity_mask=0xA0, dtc_class=0x7) self.assert_single_data_response(response) def assert_multiple_data_response(self, response): + assert isinstance(response, Response) + assert isinstance(response.service_data, ReadDTCInformation.ResponseData) + self.assertEqual(len(response.service_data.dtcs), 2) self.assertEqual(response.service_data.dtc_count, 2) + self.assertIsNotNone(response.service_data.functional_group_id) + self.assertIsNotNone(response.service_data.status_availability) + self.assertIsNotNone(response.service_data.severity_availability) + self.assertIsNotNone(response.service_data.dtc_format) + + self.assertEqual(response.service_data.functional_group_id, 0xAB) + self.assertEqual(response.service_data.status_availability.get_byte_as_int(), 0x7E) + self.assertEqual(response.service_data.severity_availability.get_byte_as_int(), 0xA0) + self.assertEqual(response.service_data.dtc_format, Dtc.Format.SAE_J2012_DA_DTCFormat_04) # 0x04 + dtc = response.service_data.dtcs[0] self.assertEqual(dtc.id, 0x123456) - self.assertEqual(dtc.status.get_byte_as_int(), 0x20) - self.assertEqual(dtc.severity.get_byte_as_int(), 32) + self.assertEqual(dtc.status.get_byte_as_int(), 0x78) + self.assertEqual(dtc.severity.get_byte_as_int(), 0xA0) self.assertEqual(len(dtc.extended_data), 0) dtc = response.service_data.dtcs[1] - self.assertEqual(dtc.id, 0x123457) - self.assertEqual(dtc.status.get_byte_as_int(), 0x20) - self.assertEqual(dtc.severity.get_byte_as_int(), 32) + self.assertEqual(dtc.id, 0x112233) + self.assertEqual(dtc.status.get_byte_as_int(), 0x44) + self.assertEqual(dtc.severity.get_byte_as_int(), 0xE0) self.assertEqual(len(dtc.extended_data), 0) def test_multiple_data(self): request = self.conn.touserqueue.get(timeout=0.2) - self.assertEqual(request, self.expected_request_bytes) - self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0x3, 0x4]) - + bytes([0x33, 0x12, 0x34, 0x56, 0x20]) - + bytes([0x33, 0x12, 0x34, 0x57, 0x20])) + self.assertEqual(request, b'\x19' + self.sb + bytes([0xAB, 0x02, 0xA7])) + self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0xAB, 0x7E, 0xA0, 0x4]) + + bytes([0xA0, 0x12, 0x34, 0x56, 0x78]) + + bytes([0xE0, 0x11, 0x22, 0x33, 0x44])) def _test_multiple_data(self): - response = self.udsclient.get_wwh_obd_dtc_by_status_mask(self.functional_group_id, self.status_mask, self.severity_mask, self.dtc_class) + response = self.udsclient.get_wwh_obd_dtc_by_status_mask(functional_group_id=0xAB, status_mask=2, severity_mask=0xA0, dtc_class=0x7) self.assert_multiple_data_response(response) + def test_unexpected_functional_group_id(self): + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b'\x19' + self.sb + bytes([0xAB, 0x02, 0xA7])) # AB -> AB+1 + self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0xAB+1, 0x7E, 0xA0, 0x4])) + + def _test_unexpected_functional_group_id(self): + with self.assertRaises(UnexpectedResponseException): + self.udsclient.get_wwh_obd_dtc_by_status_mask(functional_group_id=0xAB, status_mask=2, severity_mask=0xA0, dtc_class=0x7) + +class TestreportWWHOBDDTCWithPermanentStatus(ClientServerTest): # Subfn = 0x16 + sb = struct.pack('B', 0x55) + + def assert_no_data_response(self, response): + self.assertEqual(len(response.service_data.dtcs), 0) + self.assertEqual(response.service_data.dtc_count, 0) + + def test_no_data(self): + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b'\x19' + self.sb + bytes([1])) + self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0x4])) + + def _test_no_data(self): + response = self.udsclient.get_wwh_obd_dtc_with_permanent_status(functional_group_id=1) + self.assert_no_data_response(response) + + def test_functional_group_verification(self): + pass + + def _test_functional_group_verification(self): + with self.assertRaises(ValueError): + self.udsclient.get_wwh_obd_dtc_with_permanent_status(None) + + with self.assertRaises(ValueError): + self.udsclient.get_wwh_obd_dtc_with_permanent_status(0xff) + + def assert_no_data_response_with_severity_class(self, response): + self.assertEqual(len(response.service_data.dtcs), 0) + self.assertEqual(response.service_data.dtc_count, 0) + + + def assert_single_data_response(self, response): + assert isinstance(response, Response) + assert isinstance(response.service_data, ReadDTCInformation.ResponseData) + self.assertEqual(len(response.service_data.dtcs), 1) + self.assertEqual(response.service_data.dtc_count, 1) + + self.assertIsNotNone(response.service_data.functional_group_id) + self.assertIsNotNone(response.service_data.status_availability) + self.assertIsNone(response.service_data.severity_availability) + self.assertIsNotNone(response.service_data.dtc_format) + + self.assertEqual(response.service_data.functional_group_id, 0xAB) + self.assertEqual(response.service_data.status_availability.get_byte_as_int(), 0x7E) + self.assertEqual(response.service_data.dtc_format, Dtc.Format.SAE_J2012_DA_DTCFormat_04) # 0x04 + + dtc = response.service_data.dtcs[0] + + self.assertEqual(dtc.id, 0x123456) + self.assertEqual(dtc.status.get_byte_as_int(), 0x33) + self.assertEqual(dtc.severity.get_byte_as_int(), 0x20) + + self.assertEqual(len(dtc.extended_data), 0) + + def test_single_data(self): + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b'\x19' + self.sb + bytes([0xAB])) + self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0xAB, 0x7E, 0x4, 0x20, 0x12, 0x34, 0x56, 0x33])) + + def _test_single_data(self): + response = self.udsclient.get_wwh_obd_dtc_with_permanent_status(functional_group_id=0xAB) + self.assert_single_data_response(response) + + def assert_multiple_data_response(self, response): + assert isinstance(response, Response) + assert isinstance(response.service_data, ReadDTCInformation.ResponseData) + + self.assertEqual(len(response.service_data.dtcs), 2) + self.assertEqual(response.service_data.dtc_count, 2) + + self.assertIsNotNone(response.service_data.functional_group_id) + self.assertIsNotNone(response.service_data.status_availability) + self.assertIsNone(response.service_data.severity_availability) + self.assertIsNotNone(response.service_data.dtc_format) + + self.assertEqual(response.service_data.functional_group_id, 0xAB) + self.assertEqual(response.service_data.status_availability.get_byte_as_int(), 0x7E) + self.assertEqual(response.service_data.dtc_format, Dtc.Format.SAE_J2012_DA_DTCFormat_04) # 0x04 + + dtc = response.service_data.dtcs[0] + + self.assertEqual(dtc.id, 0x123456) + self.assertEqual(dtc.status.get_byte_as_int(), 0x78) + self.assertEqual(dtc.severity.get_byte_as_int(), 0xA0) + self.assertEqual(len(dtc.extended_data), 0) + + dtc = response.service_data.dtcs[1] + + self.assertEqual(dtc.id, 0x112233) + self.assertEqual(dtc.status.get_byte_as_int(), 0x44) + self.assertEqual(dtc.severity.get_byte_as_int(), 0xE0) + self.assertEqual(len(dtc.extended_data), 0) + + + def test_multiple_data(self): + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b'\x19' + self.sb + bytes([0xAB])) + self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0xAB, 0x7E, 0x4]) + + bytes([0xA0, 0x12, 0x34, 0x56, 0x78]) + + bytes([0xE0, 0x11, 0x22, 0x33, 0x44])) + + def _test_multiple_data(self): + response = self.udsclient.get_wwh_obd_dtc_with_permanent_status(functional_group_id=0xAB) + self.assert_multiple_data_response(response) + + def test_unexpected_functional_group_id(self): + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b'\x19' + self.sb + bytes([0xAB])) # AB -> AB+1 + self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0xAB+1, 0x7E, 0x4])) + + def _test_unexpected_functional_group_id(self): + with self.assertRaises(UnexpectedResponseException): + self.udsclient.get_wwh_obd_dtc_with_permanent_status(functional_group_id=0xAB) + class TestreportDTCExtDataRecordByRecordNumber(ClientServerTest): # Subfn = 0x16 sb = struct.pack('B', 0x16) badsb = struct.pack('B', 0x16+1) diff --git a/udsoncan/client.py b/udsoncan/client.py index 320d834..b7c4b7c 100755 --- a/udsoncan/client.py +++ b/udsoncan/client.py @@ -1378,7 +1378,12 @@ def get_dtc_by_status_severity_mask(self, status_mask: int, severity_mask: int) """ return self.read_dtc_information(services.ReadDTCInformation.Subfunction.reportDTCBySeverityMaskRecord, status_mask=status_mask, severity_mask=severity_mask) - def get_wwh_obd_dtc_by_status_mask(self, functional_group_id: int, status_mask: int, severity_mask: Union[int,Dtc.Severity], dtc_class: int) -> Optional[services.ReadDTCInformation.InterpretedResponse]: + def get_wwh_obd_dtc_by_status_mask(self, + functional_group_id: int, + status_mask: int, + severity_mask: Union[int,Dtc.Severity], + dtc_class: Union[int, Dtc.DtcClass] + ) -> Optional[services.ReadDTCInformation.InterpretedResponse]: """ Performs a ``ReadDTCInformation`` service request with subfunction ``reportWWHOBDDTCByMaskRecord`` @@ -1388,29 +1393,43 @@ def get_wwh_obd_dtc_by_status_mask(self, functional_group_id: int, status_mask: :Effective configuration: ``exception_on__response`` ``tolerate_zero_padding`` ``ignore_all_zero_dtc`` - :param functional_group_id: Functional Group ID to search for (FGID) (0x00 to 0xFF) :ref:`Dtc.FunctionalGroupIdentifiers` + :param functional_group_id: Functional Group ID to search for (FGID) (0x00 to 0xFE) :ref:`Dtc.FunctionalGroupIdentifiers` :type functional_group_id: int :param status_mask: The status mask against which the DTCs are tested. :type status_mask: int or :ref:`Dtc.Status` - :param severity_mask: The severity mask against which the DTCs are tested. (Optionas 0x20, 0x40, or 0x80) + :param severity_mask: The severity mask against which the DTCs are tested. (Bit mask of: 0x20, 0x40, or 0x80) :type severity_mask: int or :ref:`Dtc.Severity` - :param dtc_class: The GTR DTC class mask against which the DTCs are tested. (Options 0x01, 0x02, 0x04, 0x08) - :type dtc_class: int + :param dtc_class: The GTR DTC class mask against which the DTCs are tested. (Bit mask of: 0x01, 0x02, 0x04, 0x08, 0x10) + :type dtc_class: int or :ref:`Dtc.DtcClass` :return: The server response parsed by :meth:`ReadDTCInformation.interpret_response` :rtype: :ref:`Response` """ - dtc_severity_mask = dtc_class & 0x1f - if isinstance(severity_mask, Dtc.Severity): - dtc_severity_mask |= (severity_mask.get_byte_as_int() & 0xe0) - else: - dtc_severity_mask |= (severity_mask & 0xe0) - return self.read_dtc_information(services.ReadDTCInformation.Subfunction.reportWWHOBDDTCByMaskRecord, status_mask=status_mask, severity_mask=dtc_severity_mask, functional_group_id=functional_group_id) - def get_number_of_dtc_by_status_mask(self, status_mask: int) -> Optional[services.ReadDTCInformation.InterpretedResponse]: + return self.read_dtc_information(services.ReadDTCInformation.Subfunction.reportWWHOBDDTCByMaskRecord, status_mask=status_mask, severity_mask=severity_mask, dtc_class=dtc_class, functional_group_id=functional_group_id) + + def get_wwh_obd_dtc_with_permanent_status(self, functional_group_id: int) -> Optional[services.ReadDTCInformation.InterpretedResponse]: + """ + Performs a ``ReadDTCInformation`` service request with subfunction ``reportWWHOBDDTCWithPermanentStatus,`` + + Reads all the WWH OBD Diagnostic Trouble Codes that have the specified functional_group and a permanent status. + + :Effective configuration: ``exception_on__response`` ``tolerate_zero_padding`` ``ignore_all_zero_dtc`` + + :param functional_group_id: Functional Group ID to search for (FGID) (0x00 to 0xFE) :ref:`Dtc.FunctionalGroupIdentifiers` + :type functional_group_id: int + + + :return: The server response parsed by :meth:`ReadDTCInformation.interpret_response` + :rtype: :ref:`Response` + """ + + return self.read_dtc_information(services.ReadDTCInformation.Subfunction.reportWWHOBDDTCWithPermanentStatus, functional_group_id=functional_group_id) + + def get_number_of_dtc_by_status_mask(self, status_mask: Union[int, Dtc.Status]) -> Optional[services.ReadDTCInformation.InterpretedResponse]: """ Performs a ``ReadDTCInformation`` service request with subfunction ``reportNumberOfDTCByStatusMask`` @@ -1426,7 +1445,7 @@ def get_number_of_dtc_by_status_mask(self, status_mask: int) -> Optional[service """ return self.read_dtc_information(services.ReadDTCInformation.Subfunction.reportNumberOfDTCByStatusMask, status_mask=status_mask) - def get_mirrormemory_number_of_dtc_by_status_mask(self, status_mask: int) -> Optional[services.ReadDTCInformation.InterpretedResponse]: + def get_mirrormemory_number_of_dtc_by_status_mask(self, status_mask: Union[int, Dtc.Status]) -> Optional[services.ReadDTCInformation.InterpretedResponse]: """ Performs a ``ReadDTCInformation`` service request with subfunction ``reportNumberOfMirrorMemoryDTCByStatusMask`` @@ -1442,7 +1461,7 @@ def get_mirrormemory_number_of_dtc_by_status_mask(self, status_mask: int) -> Opt """ return self.read_dtc_information(services.ReadDTCInformation.Subfunction.reportNumberOfMirrorMemoryDTCByStatusMask, status_mask=status_mask) - def get_number_of_emission_dtc_by_status_mask(self, status_mask: int) -> Optional[services.ReadDTCInformation.InterpretedResponse]: + def get_number_of_emission_dtc_by_status_mask(self, status_mask: Union[int, Dtc.Status]) -> Optional[services.ReadDTCInformation.InterpretedResponse]: """ Performs a ``ReadDTCInformation`` service request with subfunction ``reportNumberOfEmissionsRelatedOBDDTCByStatusMask`` @@ -1458,7 +1477,7 @@ def get_number_of_emission_dtc_by_status_mask(self, status_mask: int) -> Optiona """ return self.read_dtc_information(services.ReadDTCInformation.Subfunction.reportNumberOfEmissionsRelatedOBDDTCByStatusMask, status_mask=status_mask) - def get_number_of_dtc_by_status_severity_mask(self, status_mask: int, severity_mask: int) -> Optional[services.ReadDTCInformation.InterpretedResponse]: + def get_number_of_dtc_by_status_severity_mask(self, status_mask: Union[int, Dtc.Status], severity_mask: Union[int, Dtc.Severity]) -> Optional[services.ReadDTCInformation.InterpretedResponse]: """ Performs a ``ReadDTCInformation`` service request with subfunction ``reportNumberOfDTCBySeverityMaskRecord`` @@ -1776,14 +1795,16 @@ def get_mirrormemory_dtc_extended_data_by_dtc_number(self, dtc: Union[int, Dtc], @standard_error_management def read_dtc_information(self, subfunction: int, - status_mask: Optional[int] = None, - severity_mask: Optional[int] = None, + status_mask: Optional[Union[Dtc.Status, int]] = None, + severity_mask: Optional[Union[Dtc.Severity, int]] = None, dtc: Optional[Union[int, Dtc]] = None, snapshot_record_number: Optional[int] = None, extended_data_record_number: Optional[int] = None, extended_data_size: Optional[int] = None, memory_selection: Optional[int] = None, - functional_group_id: Optional[int] = None): + functional_group_id: Optional[int] = None, + dtc_class: Optional[Union[int, Dtc.DtcClass]] = None + ): if dtc is not None and isinstance(dtc, Dtc): dtc = dtc.id @@ -1795,7 +1816,8 @@ def read_dtc_information(self, extended_data_record_number=extended_data_record_number, memory_selection=memory_selection, standard_version=self.config['standard_version'], - functional_group_id=functional_group_id) + functional_group_id=functional_group_id, + dtc_class=dtc_class) self.logger.info('%s - Sending request with subfunction "%s" (0x%02X).' % (self.service_log_prefix(services.ReadDTCInformation), services.ReadDTCInformation.Subfunction.get_name(subfunction), subfunction)) @@ -1825,7 +1847,7 @@ def read_dtc_information(self, except Exception as e: error = e - # If nothing can be checked, raise the rror right away + # If nothing can be checked, raise the error right away if isinstance(error, InvalidResponseException): if response.service_data is None: raise error @@ -1884,6 +1906,12 @@ def read_dtc_information(self, raise UnexpectedResponseException(response, 'Extended data record number given by the server for DTC 0x%06X has a value of %d but requested record number was %d', ( dtc_obj.id, extended_data.record_number, extended_data_record_number)) + if subfunction in [services.ReadDTCInformation.Subfunction.reportWWHOBDDTCWithPermanentStatus, services.ReadDTCInformation.Subfunction.reportWWHOBDDTCByMaskRecord]: + assert response.service_data.functional_group_id is not None + assert functional_group_id is not None + if functional_group_id != response.service_data.functional_group_id: + raise UnexpectedResponseException(response, "FunctionalGroupIdentifier in the server response does not match the value in the request. Requested 0x%02x, got 0x%02x" % (functional_group_id, response.service_data.functional_group_id)) + if Dtc.Format.get_name(response.service_data.dtc_format) is None: self.logger.warning('Unknown DTC Format Identifier %s. Value should be between 0 and 4' % ('0x%02x' % response.service_data.dtc_format if response.service_data.dtc_format is not None else '') diff --git a/udsoncan/common/dtc.py b/udsoncan/common/dtc.py index 8077622..15a4a79 100644 --- a/udsoncan/common/dtc.py +++ b/udsoncan/common/dtc.py @@ -190,6 +190,12 @@ def set_byte(self, byte: Union[bytes, int]) -> None: self.maintenance_only = True if byte & 0x20 > 0 else False self.check_at_next_exit = True if byte & 0x40 > 0 else False self.check_immediately = True if byte & 0x80 > 0 else False + + @classmethod + def from_byte(cls, byte: Union[bytes, int]) -> "Dtc.Severity": + severity = cls() + severity.set_byte(byte) + return severity @property def available(self): @@ -209,11 +215,89 @@ class ExtendedData: record_number: Optional[int] = None raw_data: Optional[bytes] = b'' + class DtcClass: + """ + Represents a DTC class information which consists of 5 boolean flags. All flags can be set after instantiation without problems. + + :param class0: Bit0 : Unclassified + :type class0: bool + + :param class1: Bit1 : GTR module B Class A definition. + :type class1: bool + + :param class2: Bit2: GTR module B Class B1 definition + :type class2: bool + + :param class3: Bit3: GTR module B Class B2 definition. + :type class3: bool + + :param class4: Bit4: GTR module B Class C definition. + :type class4: bool + """ + + class0: bool + class1: bool + class2: bool + class3: bool + class4: bool + + def __init__(self, + class0: bool = False, + class1: bool = False, + class2: bool = False, + class3: bool = False, + class4: bool = False + ): + self.class0 = class0 + self.class1 = class1 + self.class2 = class2 + self.class3 = class3 + self.class4 = class4 + + def get_byte_as_int(self) -> int: + byte = 0 + byte |= 0x01 if self.class0 else 0 + byte |= 0x02 if self.class1 else 0 + byte |= 0x04 if self.class2 else 0 + byte |= 0x08 if self.class3 else 0 + byte |= 0x10 if self.class4 else 0 + + return byte + + def get_byte(self) -> bytes: + return struct.pack('B', self.get_byte_as_int()) + + def set_byte(self, byte: Union[bytes, int]) -> None: + if not isinstance(byte, int) and not isinstance(byte, bytes): + raise ValueError('Given byte must be an integer or bytes object.') + + if isinstance(byte, bytes): + if len(byte) != 1: + raise ValueError("Expected 1 byte to set the DTC Status") + byte = int(byte[0]) + + self.class0 = True if byte & 0x01 > 0 else False + self.class1 = True if byte & 0x02 > 0 else False + self.class2 = True if byte & 0x04 > 0 else False + self.class3 = True if byte & 0x08 > 0 else False + self.class4 = True if byte & 0x10 > 0 else False + + @property + def available(self): + return True if self.get_byte_as_int() > 0 else False + + @classmethod + def from_byte(cls, byte: Union[bytes, int]) -> "Dtc.DtcClass": + dtc_class = cls() + dtc_class.set_byte(byte) + return dtc_class + id: int status: "Dtc.Status" snapshots: List[Union["Dtc.Snapshot", int]] extended_data: List["Dtc.ExtendedData"] severity: "Dtc.Severity" + dtc_class:"Dtc.DtcClass" functional_unit: Any fault_counter: Optional[int] @@ -223,12 +307,19 @@ def __init__(self, dtcid: int): self.snapshots = [] # . DID codec must be configured self.extended_data = [] self.severity = Dtc.Severity() + self.dtc_class = Dtc.DtcClass() self.functional_unit = None # Implementation specific (ISO 14229 D.4) # Common practice is to detect a specific failure many times before setting the DTC active. This counter should tell the actual count. self.fault_counter = None def __repr__(self) -> str: - return '' % (self.id, self.status.get_byte_as_int(), self.severity.get_byte_as_int(), id(self)) + return '' % ( + self.id, + self.status.get_byte_as_int(), + self.severity.get_byte_as_int(), + self.dtc_class.get_byte_as_int(), + id(self) + ) def id_iso(self) -> str: return '%c%i%03X-%02X' % ( diff --git a/udsoncan/services/ReadDTCInformation.py b/udsoncan/services/ReadDTCInformation.py index 3aa4b88..618381a 100755 --- a/udsoncan/services/ReadDTCInformation.py +++ b/udsoncan/services/ReadDTCInformation.py @@ -57,6 +57,8 @@ class ResponseData(BaseResponseData): status_availability: Optional[Dtc.Status] extended_data: Optional[List[bytes]] memory_selection_echo: Optional[int] + functional_group_id:Optional[int] + severity_availability: Optional[Dtc.Severity] def __init__(self, subfunction_echo: int, dtcs: Optional[List[Dtc]] = None, @@ -64,7 +66,9 @@ def __init__(self, subfunction_echo: int, dtc_format: Optional[int] = None, status_availability: Optional[Dtc.Status] = None, extended_data: Optional[List[bytes]] = [], - memory_selection_echo: Optional[int] = None + memory_selection_echo: Optional[int] = None, + functional_group_id:Optional[int] = None, + severity_availability: Optional[Dtc.Severity] = None ): super().__init__(ReadDTCInformation) self.subfunction_echo = subfunction_echo @@ -74,6 +78,8 @@ def __init__(self, subfunction_echo: int, self.status_availability = status_availability self.extended_data = extended_data if extended_data is not None else [] self.memory_selection_echo = memory_selection_echo + self.functional_group_id = functional_group_id + self.severity_availability=severity_availability class InterpretedResponse(Response): service_data: "ReadDTCInformation.ResponseData" @@ -107,10 +113,10 @@ class Subfunction(BaseSubfunction): reportUserDefMemoryDTCSnapshotRecordByDTCNumber = 0x18 reportUserDefMemoryDTCExtDataRecordByDTCNumber = 0x19 reportWWHOBDDTCByMaskRecord = 0x42 + reportWWHOBDDTCWithPermanentStatus = 0x55 # todo reportSupportedDTCExtDataRecord = 0x1A - reportWWHOBDDTCWithPermanentStatus = 0x55 reportDTCInformationByDTCReadinessGroupIdentifier = 0x56 @classmethod @@ -162,10 +168,10 @@ def assert_extended_data_size_int_or_dict(cls, tools.validate_int(extended_data_size[dtcid], min=0, max=0xFFF, name='Extended data size for DTC=0x%06x' % dtcid) @classmethod - def assert_functional_group_id(cls, functional_group_id: Optional[int], subfunction, maxval: int = 0xFF) -> None: + def assert_functional_group_id(cls, functional_group_id: Optional[int], subfunction) -> None: if functional_group_id is None: raise ValueError('functional_group_id must be provided for subfunction 0x%02x' % subfunction) - tools.validate_int(functional_group_id, min=0, max=maxval, name='Functional Group ID') + tools.validate_int(functional_group_id, min=0, max=0xFE, name='Functional Group ID') @classmethod def pack_dtc(cls, dtcid: int) -> bytes: @@ -204,6 +210,7 @@ def make_request(cls, subfunction: int, status_mask: Optional[Union[int, Dtc.Status]] = None, severity_mask: Optional[Union[int, Dtc.Severity]] = None, + dtc_class: Optional[Union[int, Dtc.DtcClass]] = None, dtc: Optional[Union[int, Dtc]] = None, snapshot_record_number: Optional[int] = None, extended_data_record_number: Optional[int] = None, @@ -224,6 +231,9 @@ def make_request(cls, :param severity_mask: A severity mask used to filter DTC :type severity_mask: int or :ref:`Dtc.Severity ` + :param dtc_class: A DTC class to be packed in the same byte as the severity mask + :type dtc_class: int or :ref:`Dtc.DtcClass ` + :param dtc: A DTC mask used to filter DTC :type dtc: int or :ref:`Dtc ` @@ -307,6 +317,15 @@ def make_request(cls, if severity_mask is not None and isinstance(severity_mask, Dtc.Severity): severity_mask = severity_mask.get_byte_as_int() + severity_mask = severity_mask & 0xE0 + + if dtc_class is not None : + if severity_mask is None: + raise ValueError("A severity mask must be specified in order to specify a dataclass") + + if isinstance(dtc_class, Dtc.DtcClass): + dtc_class = dtc_class.get_byte_as_int() + severity_mask |= (dtc_class & 0x1F) if dtc is not None and isinstance(dtc, Dtc): dtc = dtc.id @@ -372,8 +391,12 @@ def make_request(cls, elif subfunction == ReadDTCInformation.Subfunction.reportWWHOBDDTCByMaskRecord: cls.assert_status_mask(status_mask, subfunction) cls.assert_severity_mask(severity_mask, subfunction) - cls.assert_functional_group_id(functional_group_id, subfunction, maxval=0xFE) # Maximum specified by ISO-14229:2020 + cls.assert_functional_group_id(functional_group_id, subfunction) # Maximum specified by ISO-14229:2020 req.data = struct.pack('BBB', functional_group_id, status_mask, severity_mask) + + elif subfunction == ReadDTCInformation.Subfunction.reportWWHOBDDTCWithPermanentStatus: + cls.assert_functional_group_id(functional_group_id, subfunction) # Maximum specified by ISO-14229:2020 + req.data = struct.pack('B', functional_group_id) return req @@ -886,16 +909,37 @@ def interpret_response(cls, response.service_data.dtc_count = len(response.service_data.dtcs) - elif subfunction == ReadDTCInformation.Subfunction.reportWWHOBDDTCByMaskRecord: - if len(response.data) < 4: - raise InvalidResponseException(response, 'Incomplete response from server.') + elif subfunction in [ReadDTCInformation.Subfunction.reportWWHOBDDTCByMaskRecord, ReadDTCInformation.Subfunction.reportWWHOBDDTCWithPermanentStatus]: + + if subfunction == ReadDTCInformation.Subfunction.reportWWHOBDDTCByMaskRecord: + if len(response.data) < 5: + raise InvalidResponseException(response, 'Incomplete response from server.') + + response.service_data.functional_group_id = response.data[1] + response.service_data.status_availability = Dtc.Status() + response.service_data.status_availability.set_byte(response.data[2]) + response.service_data.severity_availability = Dtc.Severity() + response.service_data.severity_availability.set_byte(response.data[3]) + response.service_data.dtc_format = response.data[4] + remaining_bytes = response.data[5:] + elif subfunction == ReadDTCInformation.Subfunction.reportWWHOBDDTCWithPermanentStatus: + if len(response.data) < 4: + raise InvalidResponseException(response, 'Incomplete response from server.') + + response.service_data.functional_group_id = response.data[1] + response.service_data.status_availability = Dtc.Status() + response.service_data.status_availability.set_byte(response.data[2]) + response.service_data.dtc_format = response.data[3] + remaining_bytes = response.data[4:] + else: + raise NotImplementedError("Unreachable code") - _functional_group_id = response.data[1] - _dtc_status_availability_mask = response.data[2] - _dtc_severity_availability_mask = response.data[3] - _dtc_format_identifier = response.data[4] + if response.service_data.functional_group_id > 0xFE: + raise InvalidResponseException(response, "FunctionalGroupIdentifier returned by the server is not smaller than 0xFE") + + if response.service_data.dtc_format not in [Dtc.Format.SAE_J2012_DA_DTCFormat_04, Dtc.Format.SAE_J1939_73]: + raise InvalidResponseException(response, "DTCFormatIdentifier returned by the server is not one of the following: SAE_J2012-DA_DTCFormat_04 (4), SAE_J1939-73_DTCFormat(2). Got 0x%02x" % response.service_data.dtc_format) - remaining_bytes = response.data[5:] if len(remaining_bytes) % 5 != 0: raise InvalidResponseException(response, 'Incomplete response from server. Remaining bytes must be a multiple of 5') From 72457c5218dd5a186fa938e36ce5e1626bb9179c Mon Sep 17 00:00:00 2001 From: Pier-Yves Lessard Date: Sat, 29 Mar 2025 19:53:16 -0400 Subject: [PATCH 2/2] Cleaner --- udsoncan/services/ReadDTCInformation.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/udsoncan/services/ReadDTCInformation.py b/udsoncan/services/ReadDTCInformation.py index 618381a..4b65d44 100755 --- a/udsoncan/services/ReadDTCInformation.py +++ b/udsoncan/services/ReadDTCInformation.py @@ -916,10 +916,8 @@ def interpret_response(cls, raise InvalidResponseException(response, 'Incomplete response from server.') response.service_data.functional_group_id = response.data[1] - response.service_data.status_availability = Dtc.Status() - response.service_data.status_availability.set_byte(response.data[2]) - response.service_data.severity_availability = Dtc.Severity() - response.service_data.severity_availability.set_byte(response.data[3]) + response.service_data.status_availability = Dtc.Status.from_byte(response.data[2]) + response.service_data.severity_availability = Dtc.Severity.from_byte(response.data[3]) response.service_data.dtc_format = response.data[4] remaining_bytes = response.data[5:] elif subfunction == ReadDTCInformation.Subfunction.reportWWHOBDDTCWithPermanentStatus: @@ -927,20 +925,18 @@ def interpret_response(cls, raise InvalidResponseException(response, 'Incomplete response from server.') response.service_data.functional_group_id = response.data[1] - response.service_data.status_availability = Dtc.Status() - response.service_data.status_availability.set_byte(response.data[2]) + response.service_data.status_availability = Dtc.Status.from_byte(response.data[2]) response.service_data.dtc_format = response.data[3] remaining_bytes = response.data[4:] else: raise NotImplementedError("Unreachable code") if response.service_data.functional_group_id > 0xFE: - raise InvalidResponseException(response, "FunctionalGroupIdentifier returned by the server is not smaller than 0xFE") + raise InvalidResponseException(response, "FunctionalGroupIdentifier returned by the server is not smaller or equal than 0xFE") if response.service_data.dtc_format not in [Dtc.Format.SAE_J2012_DA_DTCFormat_04, Dtc.Format.SAE_J1939_73]: raise InvalidResponseException(response, "DTCFormatIdentifier returned by the server is not one of the following: SAE_J2012-DA_DTCFormat_04 (4), SAE_J1939-73_DTCFormat(2). Got 0x%02x" % response.service_data.dtc_format) - if len(remaining_bytes) % 5 != 0: raise InvalidResponseException(response, 'Incomplete response from server. Remaining bytes must be a multiple of 5')