Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions isotp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,12 +709,15 @@ def send(self,
:type send_timeout: float or None

:raises ValueError: Given data is not a bytearray, a tuple (generator,size) or the size is too big
:raises RuntimeError: Transmit queue is full
:raises RuntimeError: Transmit queue is full or tried to transmit while the stack is configured in :ref:`listen mode<param_listen_mode>`
:raises BlockingSendTimeout: When :ref:`blocking_send<param_blocking_send>` is set to ``True`` and the send operation does not complete in the given timeout.
:raises BlockingSendFailure: When :ref:`blocking_send<param_blocking_send>` is set to ``True`` and the transmission failed for any reason (e.g. unexpected frame or bad timings), including a timeout. Note that
:class:`BlockingSendTimeout<BlockingSendTimeout>` inherits :class:`BlockingSendFailure<BlockingSendFailure>`.
"""

if self.params.listen_mode:
raise RuntimeError("Cannot transmit when listen_mode=True")

if target_address_type is None:
target_address_type = self.params.default_target_address_type
else:
Expand Down Expand Up @@ -1007,13 +1010,13 @@ def _process_tx(self) -> ProcessTxReport:
return self.ProcessTxReport(msg=None, immediate_rx_required=False)

if self.tx_state == self.TxState.IDLE:
self._trigger_error(isotp.errors.UnexpectedFlowControlError('Received a FlowControl message while transmission was Idle. Ignoring'))
self._trigger_error(isotp.errors.UnexpectedFlowControlError('Received a FlowControl message while transmission was Idle. Ignoring'), inhibit_in_listen_mode=True)
else:
if flow_control_frame.flow_status == PDU.FlowStatus.Wait:
if self.params.wftmax == 0:
if self.params.wftmax == 0 and not self.params.listen_mode:
self._trigger_error(isotp.errors.UnsupportedWaitFrameError(
'Received a FlowControl requesting to wait, but wftmax is set to 0'))
elif self.wft_counter >= self.params.wftmax:
elif self.wft_counter >= self.params.wftmax and not self.params.listen_mode:
self._trigger_error(isotp.errors.MaximumWaitFrameReachedError(
'Received %d wait frame which is the maximum set in params.wftmax' % (self.wft_counter)))
self._stop_sending(success=False)
Expand Down Expand Up @@ -1388,10 +1391,11 @@ def _start_reception_after_first_frame_if_valid(self, pdu: PDU) -> bool:

return started

def _trigger_error(self, error: isotp.errors.IsoTpError) -> None:
def _trigger_error(self, error: isotp.errors.IsoTpError, inhibit_in_listen_mode:bool=False) -> None:
if self.error_handler is not None:
if hasattr(self.error_handler, '__call__') and isinstance(error, isotp.errors.IsoTpError):
self.error_handler(error)
if not (inhibit_in_listen_mode and self.params.listen_mode):
self.error_handler(error)
else:
self.logger.warning('Given error handler is not a callable object.')

Expand Down
68 changes: 65 additions & 3 deletions test/test_transport_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def error_handler(self, error):
self.error_triggered[error.__class__].append(error)

def assert_no_error_reported(self):
self.assertEqual(len(self.error_triggered), 0, "At least 1 error was reported")
self.assertEqual(len(self.error_triggered), 0, "Errors were reported and shouldn't have")

def read_queue_blocking(self, q: queue.Queue, timeout: float):
try:
Expand Down Expand Up @@ -171,7 +171,11 @@ def test_blocking_send(self):
self.layer1.send(bytes([1] * 100), send_timeout=5)
self.assert_no_error_reported()

def test_listen_mode(self):
def test_listen_mode_receiver(self):
# listen mode enabled. Address is the receiver address
self.layer2.params.blocksize=5
self.layer2.params.stmin=10
self.layer2.load_params()
layer3_rx_queue = queue.Queue()
layer3_tx_queue = queue.Queue()

Expand Down Expand Up @@ -206,6 +210,64 @@ def test_listen_mode(self):
finally:
layer3.stop()

def test_listen_mode_transmitter(self):
# listen mode enabled. Address is the transmitter address
# Expect no errors
self.layer2.params.blocksize=5
self.layer2.params.stmin=10
self.layer2.load_params()
layer3_rx_queue = queue.Queue()
layer3_tx_queue = queue.Queue()

self.queue1to2.add_tx_splice(layer3_rx_queue)
self.queue2to1.add_tx_splice(layer3_rx_queue)

params3 = self.STACK_PARAMS.copy()
params3.update(dict(logger_name='layer3', listen_mode=True))

# Layer 3 should receive the same thing as layer 2 even though it receives all messages
layer3 = isotp.TransportLayer(
txfn=partial(self.send_queue, layer3_tx_queue),
rxfn=partial(self.read_queue_blocking, layer3_rx_queue),
address=self.address1,
error_handler=self.error_handler,
params=params3
)

unittest_logging.configure_transport_layer(layer3)
layer3.start()
try:
payload = bytes([x % 255 for x in range(100)])
self.layer1.send(payload)
payload2 = self.layer2.recv(block=True, timeout=5)
self.assertEqual(payload, payload2)

self.assertFalse(layer3.available()) # Address does not match receiver address

self.assert_no_error_reported()
self.assertTrue(layer3_tx_queue.empty()) # layer3 cannot send
finally:
layer3.stop()

def test_listen_mode_cannot_transmit(self):
params3 = self.STACK_PARAMS.copy()
params3.update(dict(logger_name='layer3', listen_mode=True))

layer3_tx_queue = queue.Queue()
layer3_rx_queue = queue.Queue()
# Layer 3 should receive the same thing as layer 2 even though it receives all messages
layer3 = isotp.TransportLayer(
txfn=partial(self.send_queue, layer3_tx_queue),
rxfn=partial(self.read_queue_blocking, layer3_rx_queue),
address=self.address1,
error_handler=self.error_handler,
params=params3
)

with self.assertRaises(Exception):
layer3.send(bytes([1,2,3,4,5]))


def test_no_call_to_process_after_start(self):
# Make sure we maintain backward compatibility without introducing weird race conditions into old application
with self.assertRaises(RuntimeError):
Expand Down Expand Up @@ -287,7 +349,7 @@ def error_handler(self, error):
self.error_triggered[error.__class__].append(error)

def assert_no_error_reported(self):
self.assertEqual(len(self.error_triggered), 0, "At least 1 error was reported")
self.assertEqual(len(self.error_triggered), 0, "Errors were reported and shouldn't have")

def read_queue_blocking(self, q: queue.Queue, timeout: float):
try:
Expand Down