Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/source/udsoncan/client.rst
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ Methods by services
.. automethod:: udsoncan.client.Client.get_mirrormemory_dtc_by_status_mask
.. automethod:: udsoncan.client.Client.get_dtc_by_status_severity_mask
.. automethod:: udsoncan.client.Client.get_wwh_obd_dtc_by_status_mask
.. automethod:: udsoncan.client.Client.get_wwh_obd_dtc_with_permanent_status
.. automethod:: udsoncan.client.Client.get_number_of_dtc_by_status_mask
.. automethod:: udsoncan.client.Client.get_mirrormemory_number_of_dtc_by_status_mask
.. automethod:: udsoncan.client.Client.get_number_of_emission_dtc_by_status_mask
Expand Down
9 changes: 9 additions & 0 deletions doc/source/udsoncan/helper_classes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ DTC.Status

-----

.. _DTC_DtcClass:

DTC.DtcClass
------------

.. autoclass:: udsoncan::Dtc.DtcClass

-----

.. _DTC_Severity:

DTC.Severity
Expand Down
204 changes: 173 additions & 31 deletions test/client/test_read_dtc_information.py
Original file line number Diff line number Diff line change
Expand Up @@ -2584,103 +2584,245 @@ def __init__(self, *args, **kwargs):

class TestreportDTCWWHOBDDTCByMaskRecord(ClientServerTest): # Subfn = 0x16
sb = struct.pack('B', 0x42)
badsb = struct.pack('B', 0x42+1)
functional_group_id = 0x1
status_mask = 0x2
severity_mask = 0x20
dtc_class = 0x4
expected_request_bytes = b'\x19' + sb + bytes([functional_group_id, status_mask, severity_mask | dtc_class])

def assert_no_data_response(self, response):
self.assertEqual(len(response.service_data.dtcs), 0)
self.assertEqual(response.service_data.dtc_count, 0)

def test_no_data(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, self.expected_request_bytes)
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0x3, 0x4]))
self.assertEqual(request, b'\x19' + self.sb + bytes([1, 2, 0xA5]))
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0xA0, 0x4]))

def _test_no_data(self):
response = self.udsclient.get_wwh_obd_dtc_by_status_mask(self.functional_group_id, self.status_mask, self.severity_mask, self.dtc_class)
response = self.udsclient.get_wwh_obd_dtc_by_status_mask(functional_group_id=1, status_mask=2, severity_mask=0xA0, dtc_class=5)
self.assert_no_data_response(response)

def test_functional_group_verification(self):
pass

def _test_functional_group_verification(self):
with self.assertRaises(ValueError):
response = self.udsclient.get_wwh_obd_dtc_by_status_mask(None, self.status_mask, self.severity_mask, self.dtc_class)
self.udsclient.get_wwh_obd_dtc_by_status_mask(None, status_mask=2, severity_mask=0xA0, dtc_class=4)

with self.assertRaises(ValueError):
response = self.udsclient.get_wwh_obd_dtc_by_status_mask(0xff, self.status_mask, self.severity_mask, self.dtc_class)
self.udsclient.get_wwh_obd_dtc_by_status_mask(0xff, status_mask=2, severity_mask=0xA0, dtc_class=4)

def assert_no_data_response_with_severity_class(self, response):
self.assertEqual(len(response.service_data.dtcs), 0)
self.assertEqual(response.service_data.dtc_count, 0)

def test_no_data_with_severity_class(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, self.expected_request_bytes)
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0x3, 0x4]))
self.assertEqual(request, b'\x19' + self.sb + bytes([0x01, 0x02, 0xA7]))
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0xA0, 0x4]))

def _test_no_data_with_severity_class(self):
severity_class = Dtc.Severity()
severity_class.set_byte(self.severity_mask)
response = self.udsclient.get_wwh_obd_dtc_by_status_mask(self.functional_group_id, self.status_mask, severity_class, self.dtc_class)
severity_obj = Dtc.Severity.from_byte(0xA0)
dtc_class_obj = Dtc.DtcClass.from_byte(0x7)
response = self.udsclient.get_wwh_obd_dtc_by_status_mask(functional_group_id=1, status_mask=2, severity_mask=severity_obj, dtc_class=dtc_class_obj)
self.assert_no_data_response(response)

def assert_single_data_response(self, response):
assert isinstance(response, Response)
assert isinstance(response.service_data, ReadDTCInformation.ResponseData)
self.assertEqual(len(response.service_data.dtcs), 1)
self.assertEqual(response.service_data.dtc_count, 1)

self.assertIsNotNone(response.service_data.functional_group_id)
self.assertIsNotNone(response.service_data.status_availability)
self.assertIsNotNone(response.service_data.severity_availability)
self.assertIsNotNone(response.service_data.dtc_format)

self.assertEqual(response.service_data.functional_group_id, 0xAB)
self.assertEqual(response.service_data.status_availability.get_byte_as_int(), 0x7E)
self.assertEqual(response.service_data.severity_availability.get_byte_as_int(), 0xA0)
self.assertEqual(response.service_data.dtc_format, Dtc.Format.SAE_J2012_DA_DTCFormat_04) # 0x04

dtc = response.service_data.dtcs[0]

self.assertEqual(dtc.id, 0x123456)
self.assertEqual(dtc.status.get_byte_as_int(), 0x20)
self.assertEqual(dtc.severity.get_byte_as_int(), 32)
self.assertEqual(dtc.status.get_byte_as_int(), 0x25)
self.assertEqual(dtc.severity.get_byte_as_int(), 0x20)

self.assertEqual(len(dtc.extended_data), 0)

def test_single_data(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, self.expected_request_bytes)
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0x3, 0x4, 0x33, 0x12, 0x34, 0x56, 0x20]))
self.assertEqual(request, b'\x19' + self.sb + bytes([0xAB, 0x02, 0xA7]))
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0xAB, 0x7E, 0xA0, 0x4, 0x20, 0x12, 0x34, 0x56, 0x25]))

def _test_single_data(self):
response = self.udsclient.get_wwh_obd_dtc_by_status_mask(self.functional_group_id, self.status_mask, self.severity_mask, self.dtc_class)
response = self.udsclient.get_wwh_obd_dtc_by_status_mask(functional_group_id=0xAB, status_mask=2, severity_mask=0xA0, dtc_class=0x7)
self.assert_single_data_response(response)

def assert_multiple_data_response(self, response):
assert isinstance(response, Response)
assert isinstance(response.service_data, ReadDTCInformation.ResponseData)

self.assertEqual(len(response.service_data.dtcs), 2)
self.assertEqual(response.service_data.dtc_count, 2)

self.assertIsNotNone(response.service_data.functional_group_id)
self.assertIsNotNone(response.service_data.status_availability)
self.assertIsNotNone(response.service_data.severity_availability)
self.assertIsNotNone(response.service_data.dtc_format)

self.assertEqual(response.service_data.functional_group_id, 0xAB)
self.assertEqual(response.service_data.status_availability.get_byte_as_int(), 0x7E)
self.assertEqual(response.service_data.severity_availability.get_byte_as_int(), 0xA0)
self.assertEqual(response.service_data.dtc_format, Dtc.Format.SAE_J2012_DA_DTCFormat_04) # 0x04

dtc = response.service_data.dtcs[0]

self.assertEqual(dtc.id, 0x123456)
self.assertEqual(dtc.status.get_byte_as_int(), 0x20)
self.assertEqual(dtc.severity.get_byte_as_int(), 32)
self.assertEqual(dtc.status.get_byte_as_int(), 0x78)
self.assertEqual(dtc.severity.get_byte_as_int(), 0xA0)
self.assertEqual(len(dtc.extended_data), 0)

dtc = response.service_data.dtcs[1]

self.assertEqual(dtc.id, 0x123457)
self.assertEqual(dtc.status.get_byte_as_int(), 0x20)
self.assertEqual(dtc.severity.get_byte_as_int(), 32)
self.assertEqual(dtc.id, 0x112233)
self.assertEqual(dtc.status.get_byte_as_int(), 0x44)
self.assertEqual(dtc.severity.get_byte_as_int(), 0xE0)
self.assertEqual(len(dtc.extended_data), 0)


def test_multiple_data(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, self.expected_request_bytes)
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0x3, 0x4])
+ bytes([0x33, 0x12, 0x34, 0x56, 0x20])
+ bytes([0x33, 0x12, 0x34, 0x57, 0x20]))
self.assertEqual(request, b'\x19' + self.sb + bytes([0xAB, 0x02, 0xA7]))
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0xAB, 0x7E, 0xA0, 0x4])
+ bytes([0xA0, 0x12, 0x34, 0x56, 0x78])
+ bytes([0xE0, 0x11, 0x22, 0x33, 0x44]))

def _test_multiple_data(self):
response = self.udsclient.get_wwh_obd_dtc_by_status_mask(self.functional_group_id, self.status_mask, self.severity_mask, self.dtc_class)
response = self.udsclient.get_wwh_obd_dtc_by_status_mask(functional_group_id=0xAB, status_mask=2, severity_mask=0xA0, dtc_class=0x7)
self.assert_multiple_data_response(response)


def test_unexpected_functional_group_id(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, b'\x19' + self.sb + bytes([0xAB, 0x02, 0xA7])) # AB -> AB+1
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0xAB+1, 0x7E, 0xA0, 0x4]))

def _test_unexpected_functional_group_id(self):
with self.assertRaises(UnexpectedResponseException):
self.udsclient.get_wwh_obd_dtc_by_status_mask(functional_group_id=0xAB, status_mask=2, severity_mask=0xA0, dtc_class=0x7)

class TestreportWWHOBDDTCWithPermanentStatus(ClientServerTest): # Subfn = 0x16
sb = struct.pack('B', 0x55)

def assert_no_data_response(self, response):
self.assertEqual(len(response.service_data.dtcs), 0)
self.assertEqual(response.service_data.dtc_count, 0)

def test_no_data(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, b'\x19' + self.sb + bytes([1]))
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0x1, 0x2, 0x4]))

def _test_no_data(self):
response = self.udsclient.get_wwh_obd_dtc_with_permanent_status(functional_group_id=1)
self.assert_no_data_response(response)

def test_functional_group_verification(self):
pass

def _test_functional_group_verification(self):
with self.assertRaises(ValueError):
self.udsclient.get_wwh_obd_dtc_with_permanent_status(None)

with self.assertRaises(ValueError):
self.udsclient.get_wwh_obd_dtc_with_permanent_status(0xff)

def assert_no_data_response_with_severity_class(self, response):
self.assertEqual(len(response.service_data.dtcs), 0)
self.assertEqual(response.service_data.dtc_count, 0)


def assert_single_data_response(self, response):
assert isinstance(response, Response)
assert isinstance(response.service_data, ReadDTCInformation.ResponseData)
self.assertEqual(len(response.service_data.dtcs), 1)
self.assertEqual(response.service_data.dtc_count, 1)

self.assertIsNotNone(response.service_data.functional_group_id)
self.assertIsNotNone(response.service_data.status_availability)
self.assertIsNone(response.service_data.severity_availability)
self.assertIsNotNone(response.service_data.dtc_format)

self.assertEqual(response.service_data.functional_group_id, 0xAB)
self.assertEqual(response.service_data.status_availability.get_byte_as_int(), 0x7E)
self.assertEqual(response.service_data.dtc_format, Dtc.Format.SAE_J2012_DA_DTCFormat_04) # 0x04

dtc = response.service_data.dtcs[0]

self.assertEqual(dtc.id, 0x123456)
self.assertEqual(dtc.status.get_byte_as_int(), 0x33)
self.assertEqual(dtc.severity.get_byte_as_int(), 0x20)

self.assertEqual(len(dtc.extended_data), 0)

def test_single_data(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, b'\x19' + self.sb + bytes([0xAB]))
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0xAB, 0x7E, 0x4, 0x20, 0x12, 0x34, 0x56, 0x33]))

def _test_single_data(self):
response = self.udsclient.get_wwh_obd_dtc_with_permanent_status(functional_group_id=0xAB)
self.assert_single_data_response(response)

def assert_multiple_data_response(self, response):
assert isinstance(response, Response)
assert isinstance(response.service_data, ReadDTCInformation.ResponseData)

self.assertEqual(len(response.service_data.dtcs), 2)
self.assertEqual(response.service_data.dtc_count, 2)

self.assertIsNotNone(response.service_data.functional_group_id)
self.assertIsNotNone(response.service_data.status_availability)
self.assertIsNone(response.service_data.severity_availability)
self.assertIsNotNone(response.service_data.dtc_format)

self.assertEqual(response.service_data.functional_group_id, 0xAB)
self.assertEqual(response.service_data.status_availability.get_byte_as_int(), 0x7E)
self.assertEqual(response.service_data.dtc_format, Dtc.Format.SAE_J2012_DA_DTCFormat_04) # 0x04

dtc = response.service_data.dtcs[0]

self.assertEqual(dtc.id, 0x123456)
self.assertEqual(dtc.status.get_byte_as_int(), 0x78)
self.assertEqual(dtc.severity.get_byte_as_int(), 0xA0)
self.assertEqual(len(dtc.extended_data), 0)

dtc = response.service_data.dtcs[1]

self.assertEqual(dtc.id, 0x112233)
self.assertEqual(dtc.status.get_byte_as_int(), 0x44)
self.assertEqual(dtc.severity.get_byte_as_int(), 0xE0)
self.assertEqual(len(dtc.extended_data), 0)


def test_multiple_data(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, b'\x19' + self.sb + bytes([0xAB]))
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0xAB, 0x7E, 0x4])
+ bytes([0xA0, 0x12, 0x34, 0x56, 0x78])
+ bytes([0xE0, 0x11, 0x22, 0x33, 0x44]))

def _test_multiple_data(self):
response = self.udsclient.get_wwh_obd_dtc_with_permanent_status(functional_group_id=0xAB)
self.assert_multiple_data_response(response)

def test_unexpected_functional_group_id(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, b'\x19' + self.sb + bytes([0xAB])) # AB -> AB+1
self.conn.fromuserqueue.put(b'\x59' + self.sb + bytes([0xAB+1, 0x7E, 0x4]))

def _test_unexpected_functional_group_id(self):
with self.assertRaises(UnexpectedResponseException):
self.udsclient.get_wwh_obd_dtc_with_permanent_status(functional_group_id=0xAB)

class TestreportDTCExtDataRecordByRecordNumber(ClientServerTest): # Subfn = 0x16
sb = struct.pack('B', 0x16)
badsb = struct.pack('B', 0x16+1)
Expand Down
Loading