-
-
Notifications
You must be signed in to change notification settings - Fork 193
/
InputOutputControlByIdentifier.py
executable file
·266 lines (206 loc) · 12.4 KB
/
InputOutputControlByIdentifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
import struct
import math
from udsoncan.Request import Request
from udsoncan.Response import Response
from udsoncan import IOMasks, IOValues, DidCodec, CodecDefinition, IOConfig, IOConfigEntry
from udsoncan.exceptions import *
from udsoncan.BaseService import BaseService, BaseSubfunction, BaseResponseData
from udsoncan.ResponseCode import ResponseCode
import udsoncan.tools as tools
from udsoncan.common.dids import make_did_codec_from_definition, fetch_codec_definition_from_config
from typing import Optional, Any, Union, List, Dict, cast
class InputOutputControlByIdentifier(BaseService):
_sid = 0x2F
_use_subfunction = False
class ControlParam(BaseSubfunction):
"""
InputOutputControlByIdentifier defined control parameters as defined by ISO-14229:2006, Annex E
"""
__pretty_name__ = 'control parameter'
returnControlToECU = 0
resetToDefault = 1
freezeCurrentState = 2
shortTermAdjustment = 3
supported_negative_response = [ResponseCode.IncorrectMessageLengthOrInvalidFormat,
ResponseCode.ConditionsNotCorrect,
ResponseCode.RequestOutOfRange,
ResponseCode.SecurityAccessDenied
]
class ResponseData(BaseResponseData):
"""
.. data:: did_echo
DID echoed back by the server
.. data:: control_param_echo
control_param echoed back by the server
.. data:: decoded_data
Value processed by the :ref:`DidCodec<DidCodec>`.decode() method
"""
did_echo: int
control_param_echo: Optional[int]
decoded_data: Optional[Any]
def __init__(self, did_echo: int, control_param_echo: Optional[int] = None, decoded_data: Optional[Any] = None):
super().__init__(InputOutputControlByIdentifier)
self.did_echo = did_echo
self.control_param_echo = control_param_echo
self.decoded_data = decoded_data
class InterpretedResponse(Response):
service_data: "InputOutputControlByIdentifier.ResponseData"
@classmethod
def make_request(cls,
did: int,
control_param: Optional[int] = None,
values: Optional[Union[List[Any], Dict[str, Any], IOValues]] = None,
masks: Optional[Union[List[str], Dict[str, bool], IOMasks, bool]] = None,
ioconfig: Optional[IOConfig] = None
) -> Request:
"""
Generates a request for InputOutputControlByIdentifier
:param did: Data identifier to represent the IO
:type did: int
:param control_param: Optional parameter that can be a value defined in :class:`InputOutputControlByIdentifier.ControlParam<udsoncan.services.InputOutputControlByIdentifier.ControlParam>`
:type control_param: int
:param values: Optional values to send to the server. This parameter will be given to :ref:`DidCodec<DidCodec>`.encode() method.
It can be:
- A list for positional arguments
- A dict for named arguments
- An instance of :ref:`IOValues<IOValues>` for mixed arguments
:type values: list, dict, :ref:`IOValues<IOValues>`
:param masks: Optional mask record for composite values. The mask definition must be included in ``ioconfig``
It can be:
- A list naming the bit mask to set
- A dict with the mask name as a key and a boolean as the value (True to set the mask, False to clear it)
- An instance of :ref:`IOMask<IOMask>`
- A boolean value to set all masks to the same value.
:type masks: list, dict, :ref:`IOMask<IOMask>`, bool
:param ioconfig: Definition of DID codecs. Dictionary mapping a DID (int) to a valid :ref:`DidCodec<DidCodec>` class or pack/unpack string.
It is possible to use composite :ref:`DidCodec<DidCodec>` by specifying a dict with entries : codec, mask, mask_size.
:type ioconfig: dict[int] = :ref:`DidCodec<DidCodec>`, dict
:raises ValueError: If parameters are out of range, missing or wrong type
:raises ConfigError: If given DID is not defined within ioconfig
"""
tools.validate_int(did, min=0, max=0xffff, name='DID')
if control_param is not None:
if not isinstance(control_param, int):
raise ValueError("control_param must be a valid integer")
if control_param < 0 or control_param > 3:
raise ValueError(
'control_param must either be returnControlToECU(0), resetToDefault(1), freezeCurrentState(2), shortTermAdjustment(3). %d given.' % control_param)
if values is not None:
if isinstance(values, list):
values = IOValues(*values)
if isinstance(values, dict):
values = IOValues(**values)
if not isinstance(values, IOValues):
raise ValueError("values must be an instance of IOValues")
if masks is not None:
if isinstance(masks, list):
masks = IOMasks(*masks)
if isinstance(masks, dict):
masks = IOMasks(**masks)
if not isinstance(masks, IOMasks) and not isinstance(masks, bool):
raise ValueError("masks must be an instance of IOMask or a boolean value")
if values is None and masks is not None:
raise ValueError('An IOValue must be given if a IOMask is provided.')
if ioconfig is None:
raise ValueError("ioconfig must be provided")
request = Request(service=cls)
request.data = bytes()
# IO dids are defined in client config.
ioconfig_validated = tools.check_io_config(did, ioconfig)
io_config_entry = tools.fetch_io_entry_from_config(did, ioconfig_validated)
request.data += struct.pack('>H', did)
# This parameter is optional according to standard
if control_param is not None:
request.data += struct.pack('B', control_param)
# tools.
codec = make_did_codec_from_definition(io_config_entry) # Get IO codec from config
if values is not None:
request.data += codec.encode(*values.args, **values.kwargs)
if masks is not None: # Skip the masks byte if none is given.
if isinstance(masks, bool):
byte: bytes = b'\xFF' if masks == True else b'\x00'
if 'mask_size' in io_config_entry and io_config_entry['mask_size'] is not None:
mask_size = io_config_entry['mask_size']
assert mask_size is not None # mypy nitpick
request.data += (byte * mask_size)
else:
raise ConfigError(
'mask_size', msg='Given mask is boolean value, indicating that all mask should be set to same value, but no mask_size is defined in configuration. Cannot guess how many bits to set.')
elif isinstance(masks, IOMasks):
if 'mask' not in io_config_entry:
raise ConfigError('mask', msg='Cannot apply given mask. Input/Output configuration does not define their position (and size).')
masks_config = io_config_entry['mask']
if masks_config is None:
masks_config = {}
given_masks = masks.get_dict()
numeric_val: int = 0
for mask_name in given_masks:
if mask_name not in masks_config:
raise ConfigError('mask_size', msg='Cannot set mask bit for mask %s. The configuration does not define its position' % (mask_name))
if given_masks[mask_name] == True:
numeric_val |= masks_config[mask_name]
size = math.ceil(math.log(numeric_val + 1, 2) / 8.0)
if 'mask_size' in io_config_entry:
mask_size = io_config_entry['mask_size']
if mask_size is not None:
size = mask_size
request.data += numeric_val.to_bytes(size, 'big')
return request
@classmethod
def interpret_response(cls,
response: Response,
control_param: Optional[int] = None,
tolerate_zero_padding: bool = True,
ioconfig: Optional[IOConfig] = None
) -> InterpretedResponse:
"""
Populates the response ``service_data`` property with an instance of :class:`InputOutputControlByIdentifier.ResponseData<udsoncan.services.InputOutputControlByIdentifier.ResponseData>`
:param response: The received response to interpret
:type response: :ref:`Response<Response>`
:param control_param: Same optional control parameter value given to make_request()
:type control_param: int
:param tolerate_zero_padding: Ignore trailing zeros in the response data avoiding raising false :class:`InvalidResponseException<udsoncan.exceptions.InvalidResponseException>`.
:type tolerate_zero_padding: bool
:param ioconfig: Definition of DID codecs. Dictionary mapping a DID (int) to a valid :ref:`DidCodec<DidCodec>` class or pack/unpack string.
It is possible to use composite DidCodec by specifying a dict with entries : codec, mask, mask_size.
:type ioconfig: dict[int] = :ref:`DidCodec<DidCodec>`, dict
:raises ValueError: If parameters are out of range, missing or wrong type
:raises ConfigError: If DID echoed back by the server is not in the ``ioconfig`` definition
:raises InvalidResponseException: If response data is incomplete or if DID data does not match codec length.
"""
if response.data is None:
raise InvalidResponseException(response, "No data in response")
if ioconfig is None:
raise ValueError("IoConfig must be defined")
min_response_size = 2 if control_param is not None else 1 # Spec specifies that if first byte is a ControlParameter, it must be echoed back by the server
if len(response.data) < min_response_size:
raise InvalidResponseException(response, "Response must be at least %d bytes long" % min_response_size)
response.service_data = cls.ResponseData(
did_echo=struct.unpack(">H", response.data[0:2])[0]
)
did = response.service_data.did_echo
ioconfig_validated = tools.check_io_config(did, ioconfig) # IO DIDs are defined in client config.
io_config_entry = tools.fetch_io_entry_from_config(did, ioconfig_validated) # Get requested did definition (given or default)
codec = make_did_codec_from_definition(io_config_entry) # Get IO codec from config
next_byte = 2
if control_param is not None:
if len(response.data) < next_byte:
raise InvalidResponseException(
response, 'Response should include an echo of the InputOutputControlParameter (0x%02x)' % control_param)
response.service_data.control_param_echo = response.data[next_byte]
next_byte += 1
if len(response.data) >= next_byte:
remaining_data = response.data[next_byte:]
try:
payload_size = len(codec)
except DidCodec.ReadAllRemainingData:
payload_size = len(response.data) - next_byte
if len(remaining_data) > payload_size:
if remaining_data[payload_size:] == b'\x00' * (len(remaining_data) - payload_size):
if tolerate_zero_padding:
remaining_data = remaining_data[0:payload_size]
try:
response.service_data.decoded_data = codec.decode(remaining_data)
except Exception as e:
raise InvalidResponseException(response, 'Response from server could not be decoded. Exception is : %s' % e)
return cast(InputOutputControlByIdentifier.InterpretedResponse, response)