Skip to content

Commit

Permalink
Improve reduce function for Automotive Scanner Enumerators (#3740)
Browse files Browse the repository at this point in the history
  • Loading branch information
polybassa committed Sep 21, 2022
1 parent fdfff8e commit 799f272
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 33 deletions.
2 changes: 1 addition & 1 deletion scapy/contrib/automotive/gm/gmlan_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def _get_table_entry_z(self, tup):

def pre_execute(self, socket, state, global_configuration):
# type: (_SocketUnion, EcuState, AutomotiveTestCaseExecutorConfiguration) -> None # noqa: E501
if cast(ServiceEnumerator, self)._retry_pkt[state] is not None and \
if cast(ServiceEnumerator, self)._retry_pkt[state] and \
not global_configuration.unittest:
# this is a retry execute. Wait much longer than usual because
# a required time delay not expired could have been received
Expand Down
17 changes: 8 additions & 9 deletions scapy/contrib/automotive/scanner/enumerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ def __init__(self):
self._result_packets = OrderedDict() # type: Dict[bytes, Packet]
self._results = list() # type: List[_AutomotiveTestCaseScanResult]
self._request_iterators = dict() # type: Dict[EcuState, Iterable[Packet]] # noqa: E501
self._retry_pkt = defaultdict(
lambda: None) # type: Dict[EcuState, Optional[Union[Packet, Iterable[Packet]]]] # noqa: E501
self._retry_pkt = defaultdict(list) # type: Dict[EcuState, Union[Packet, Iterable[Packet]]] # noqa: E501
self._negative_response_blacklist = [0x10, 0x11] # type: List[int]
self._requests_per_state_estimated = None # type: Optional[int]

Expand Down Expand Up @@ -173,12 +172,14 @@ def _get_initial_requests(self, **kwargs):
def __reduce__(self): # type: ignore
f, t, d = super(ServiceEnumerator, self).__reduce__() # type: ignore
try:
del d["_request_iterators"]
for k, v in six.iteritems(d["_request_iterators"]):
d["_request_iterators"][k] = list(v)
except KeyError:
pass

try:
del d["_retry_pkt"]
for k in d["_retry_pkt"]:
d["_retry_pkt"][k] = list(self._get_retry_iterator(k))
except KeyError:
pass
return f, t, d
Expand Down Expand Up @@ -214,9 +215,7 @@ def _store_result(self, state, req, res):
def _get_retry_iterator(self, state):
# type: (EcuState) -> Iterable[Packet]
retry_entry = self._retry_pkt[state]
if retry_entry is None:
return []
elif isinstance(retry_entry, Packet):
if isinstance(retry_entry, Packet):
log_automotive.debug("Provide retry packet")
return [retry_entry]
else:
Expand Down Expand Up @@ -376,7 +375,7 @@ def _evaluate_response(self,
return True

# cleanup retry packet
self._retry_pkt[state] = None
self._retry_pkt[state] = []

return self._evaluate_ecu_state_modifications(state, request, response)

Expand Down Expand Up @@ -435,7 +434,7 @@ def _populate_retry(self,
current execution was already a retry execution.
"""

if self._retry_pkt[state] is None:
if not self._get_retry_iterator(state):
# This was no retry since the retry_pkt is None
self._retry_pkt[state] = request
log_automotive.debug(
Expand Down
2 changes: 1 addition & 1 deletion scapy/contrib/automotive/uds_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ def _get_table_entry_z(self, tup):

def pre_execute(self, socket, state, global_configuration):
# type: (_SocketUnion, EcuState, AutomotiveTestCaseExecutorConfiguration) -> None # noqa: E501
if cast(ServiceEnumerator, self)._retry_pkt[state] is not None:
if cast(ServiceEnumerator, self)._retry_pkt[state]:
# this is a retry execute. Wait much longer than usual because
# a required time delay not expired could have been received
# on the previous attempt
Expand Down
12 changes: 6 additions & 6 deletions test/contrib/automotive/gm/scanner.uts
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,21 @@ s = EcuState(session=1)

assert False == e._evaluate_response(s, GMLAN(b"\x27\x01"), None, **config)
config = {"exit_if_service_not_supported": True}
assert e._retry_pkt[s] == None
assert not e._retry_pkt[s]
assert True == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x7f\x27\x11"), **config)
assert e._retry_pkt[s] == None
assert not e._retry_pkt[s]
assert True == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x7f\x27\x22"), **config)
assert e._retry_pkt[s] == GMLAN(b"\x27\x01")
assert False == e._evaluate_response(s, GMLAN(b"\x27\x02"), GMLAN(b"\x7f\x27\x22"), **config)
assert e._retry_pkt[s] is None
assert not e._retry_pkt[s]
assert True == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x7f\x27\x37"), **config)
assert e._retry_pkt[s] == GMLAN(b"\x27\x01")
assert False == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x7f\x27\x37"), **config)
assert e._retry_pkt[s] == None
assert not e._retry_pkt[s]
assert True == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x67\x01ab"), **config)
assert e._retry_pkt[s] == None
assert not e._retry_pkt[s]
assert False == e._evaluate_response(s, GMLAN(b"\x27\x01"), GMLAN(b"\x67\x02ab"), **config)
assert e._retry_pkt[s] == None
assert not e._retry_pkt[s]


= Simulate ECU and run Scanner
Expand Down
20 changes: 10 additions & 10 deletions test/contrib/automotive/scanner/enumerator.uts
Original file line number Diff line number Diff line change
Expand Up @@ -169,21 +169,21 @@ assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"),
assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x11"), **conf)
assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x7f"), **conf)
conf = {"exit_if_service_not_supported": False, "retry_if_busy_returncode": True}
assert e._retry_pkt[EcuState(session=1)] == None
assert not e._retry_pkt[EcuState(session=1)]
assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x10"), **conf)
assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x11"), **conf)
assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x7f"), **conf)
assert e._retry_pkt[EcuState(session=1)] == None
assert not e._retry_pkt[EcuState(session=1)]
assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x21"), **conf)
assert e._retry_pkt[EcuState(session=1)] == UDS(b"\x10\x03abcd")
assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x21"), **conf)
assert e._retry_pkt[EcuState(session=1)] == None
assert not e._retry_pkt[EcuState(session=1)]

assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x50\x03\x00"), **conf)
assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x11\x03abcd"), UDS(b"\x51\x03\x00"), **conf)
conf = {"retry_if_none_received": True}
assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), None, **conf)
assert e._retry_pkt[EcuState(session=1)] is not None
assert e._retry_pkt[EcuState(session=1)]


= ServiceEnumerator execute
Expand Down Expand Up @@ -286,14 +286,14 @@ e = MyTestCase()

e.execute(sock, EcuState(session=1), exit_if_service_not_supported=True)

assert e._retry_pkt[EcuState(session=1)] is None
assert not e._retry_pkt[EcuState(session=1)]
assert len(e.results_with_response) == 1
assert len(e.results_with_negative_response) == 1
assert e.completed

e.execute(sock, EcuState(session=2), exit_if_service_not_supported=True)

assert e._retry_pkt[EcuState(session=2)] is None
assert not e._retry_pkt[EcuState(session=2)]
assert len(e.results_with_response) == 2
assert len(e.results_with_negative_response) == 2
assert e.completed
Expand All @@ -307,15 +307,15 @@ e = MyTestCase()

e.execute(sock, EcuState(session=1))

assert e._retry_pkt[EcuState(session=1)] is not None
assert e._retry_pkt[EcuState(session=1)]
assert len(e.results_with_response) == 1
assert len(e.results_with_negative_response) == 1
assert len(e.results_without_response) == 0
assert not e.completed

e.execute(sock, EcuState(session=1))

assert e._retry_pkt[EcuState(session=1)] is None
assert not e._retry_pkt[EcuState(session=1)]
assert len(e.results_with_response) == 2
assert len(e.results_with_negative_response) == 2
assert len(e.results_without_response) == 9
Expand All @@ -330,7 +330,7 @@ e = MyTestCase()

e.execute(sock, EcuState(session=1), retry_if_busy_returncode=False)

assert e._retry_pkt[EcuState(session=1)] is None
assert not e._retry_pkt[EcuState(session=1)]
assert len(e.results_with_response) == 1
assert len(e.results_with_negative_response) == 1
assert len(e.results_without_response) == 9
Expand All @@ -345,7 +345,7 @@ e = MyTestCase()

e.execute(sock, EcuState(session=1), execution_time=-1)

assert e._retry_pkt[EcuState(session=1)] is None
assert not e._retry_pkt[EcuState(session=1)]
assert len(e.results_with_response) == 1
assert len(e.results_with_negative_response) == 1
assert len(e.results_without_response) == 0
Expand Down
12 changes: 6 additions & 6 deletions test/contrib/automotive/scanner/uds_scanner.uts
Original file line number Diff line number Diff line change
Expand Up @@ -128,21 +128,21 @@ s = EcuState(session=1)

assert False == e._evaluate_response(s, UDS(b"\x27\x01"), None, **config)
config = {"exit_if_service_not_supported": True}
assert e._retry_pkt[s] == None
assert not e._retry_pkt[s]
assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x11"), **config)
assert e._retry_pkt[s] == None
assert not e._retry_pkt[s]
assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x24"), **config)
assert e._retry_pkt[s] == UDS(b"\x27\x01")
assert False == e._evaluate_response(s, UDS(b"\x27\x02"), UDS(b"\x7f\x27\x24"), **config)
assert e._retry_pkt[s] is None
assert not e._retry_pkt[s]
assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x37"), **config)
assert e._retry_pkt[s] == UDS(b"\x27\x01")
assert False == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x37"), **config)
assert e._retry_pkt[s] == None
assert not e._retry_pkt[s]
assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x67\x01ab"), **config)
assert e._retry_pkt[s] == None
assert not e._retry_pkt[s]
assert False == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x67\x02ab"), **config)
assert e._retry_pkt[s] == None
assert not e._retry_pkt[s]


= Test UDS_SA_XOR_Enumerator stand alone mode
Expand Down

0 comments on commit 799f272

Please sign in to comment.