From 45c2191a8de9a2988f44edc57295508580bb2207 Mon Sep 17 00:00:00 2001 From: Pier-Yves Lessard Date: Sun, 28 Jan 2024 10:48:52 -0500 Subject: [PATCH] USe selector when reading from socket and cleaned unit test a bit --- test/stub.py | 74 ------------------------------ test/test_connection.py | 79 ++++++++++++++++++++++++++------ test/test_stubbed_isotpsock.py | 84 ---------------------------------- udsoncan/connections.py | 26 ++++++----- 4 files changed, 79 insertions(+), 184 deletions(-) delete mode 100755 test/stub.py delete mode 100755 test/test_stubbed_isotpsock.py diff --git a/test/stub.py b/test/stub.py deleted file mode 100755 index b01f545..0000000 --- a/test/stub.py +++ /dev/null @@ -1,74 +0,0 @@ -from udsoncan.exceptions import TimeoutException -from udsoncan import connections, Request, Response -import queue -import logging -import socket -from dataclasses import dataclass - - -class StubbedIsoTPSocket(object): - conns = {} - - def __init__(self, name=None, timeout=1): - self.bound = False - self.interface = None - self.address = None - self.timeout = timeout - - self.queue_in = queue.Queue() # Client reads from this queue. Other end is simulated - - def bind(self, interface, address): - self.interface = interface - self.address = address - self.bound = True - sockkey = (self.interface, self.address) - if sockkey not in StubbedIsoTPSocket.conns: - StubbedIsoTPSocket.conns[sockkey] = dict() - StubbedIsoTPSocket.conns[sockkey][id(self)] = self - - def close(self): - self.bound = False - sockkey = (self.interface, self.address) - if sockkey in StubbedIsoTPSocket.conns: - if id(self) in StubbedIsoTPSocket.conns[sockkey]: - del StubbedIsoTPSocket.conns[sockkey][id(self)] - while not self.queue_in.empty(): - self.queue_in.get() - - def must_receive(self, dst_interface: str, srcaddr: "isotp.Address", dstaddr: "isotp.Address"): - if dst_interface != self.interface: - return False - - @dataclass - class StubbedCanMsg: - is_extended_id: bool - arbitration_id: int - data: bytes - - # Simulate an isotp.CanMessage to work with addresses - msg = StubbedCanMsg( - is_extended_id=srcaddr.is_tx_29bits(), - arbitration_id=srcaddr.get_tx_arbitration_id(), - data=bytes([srcaddr.get_tx_extension_byte()]) if srcaddr.requires_tx_extension_byte() else bytes() - ) - - if dstaddr.is_for_me(msg): - return True - - return False - - def send(self, payload): - # if target_sockkey in StubbedIsoTPSocket.conns: - for target_sockkey in StubbedIsoTPSocket.conns: - dst_interface = target_sockkey[0] - dstaddr = target_sockkey[1] - if self.must_receive(dst_interface, self.address, dstaddr): - for sockid in StubbedIsoTPSocket.conns[target_sockkey]: - StubbedIsoTPSocket.conns[target_sockkey][sockid].queue_in.put(payload) - - def recv(self): - try: - payload = self.queue_in.get(block=True, timeout=self.timeout) - except queue.Empty: - raise socket.timeout - return payload diff --git a/test/test_connection.py b/test/test_connection.py index c4a296d..98612d7 100755 --- a/test/test_connection.py +++ b/test/test_connection.py @@ -1,6 +1,5 @@ from test.UdsTest import UdsTest from udsoncan.connections import * -from test.stub import StubbedIsoTPSocket import socket import threading import time @@ -32,13 +31,13 @@ _AISOTP_POSSIBLE = False +@unittest.skipIf(_isotp_module_available == False, "isotp module not available") class TestIsoTPSocketConnection(UdsTest): def setUp(self): - self.tpsock1 = StubbedIsoTPSocket(timeout=0.1) - self.tpsock2 = StubbedIsoTPSocket(timeout=0.1) + self.tpsock1 = isotp.socket() + self.tpsock2 = isotp.socket() - @unittest.skipIf(_isotp_module_available == False, "Missing isotp module") def test_open(self): addr = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x001, txid=0x002) conn = IsoTPSocketConnection(interface='vcan0', address=addr, tpsock=self.tpsock1, name='unittest') @@ -48,7 +47,6 @@ def test_open(self): conn.close() self.assertFalse(conn.is_open()) - @unittest.skipIf(_isotp_module_available == False, "Missing isotp module") def test_transmit(self): addr1 = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x100, txid=0x101) addr2 = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x101, txid=0x100) @@ -62,14 +60,18 @@ def test_transmit(self): payload2 = conn2.wait_frame(timeout=0.3, exception=True) self.assertEqual(payload1, payload2) + def tearDown(self) -> None: + self.tpsock1.close() + self.tpsock2.close() + class TestSocketConnection(UdsTest): def server_sock_thread_task(self): - self.thread_started = True + self.started_event.set() self.sock1, addr = self.server_sock.accept() def setUp(self): - self.thread_started = False + self.started_event = threading.Event() self.server_sock_thread = threading.Thread(target=self.server_sock_thread_task) self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -83,17 +85,15 @@ def setUp(self): self.server_sock.listen(1) self.server_sock_thread.start() - t1 = time.time() - while not self.thread_started: - if (time.time() - t1) > 0.5: - raise RuntimeError('Timeout while connecting sockets together.') - time.sleep(0.01) - time.sleep(0.01) + self.started_event.wait(0.5) + if not self.started_event.is_set(): + raise RuntimeError('Timeout while connecting sockets together.') + time.sleep(0.01) # Handle race condition like an amateur self.sock2.connect(self.server_sock.getsockname()) - t1 = time.time() + t1 = time.monotonic() while self.sock1 is None: - if (time.time() - t1) > 0.5: + if (time.monotonic() - t1) > 0.5: raise RuntimeError('Timeout while connecting sockets together.') def tearDown(self): @@ -126,6 +126,55 @@ def test_transmit(self): self.assertEqual(payload1, payload2) +class TestSocketConnectionBlocking(UdsTest): + def server_sock_thread_task(self): + self.started_event.set() + # Race condition here. + self.sock1, addr = self.server_sock.accept() + + def setUp(self): + self.started_event = threading.Event() + self.server_sock_thread = threading.Thread(target=self.server_sock_thread_task) + + self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_sock.setblocking(True) + self.sock1 = None + self.sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + self.server_sock.bind(('127.0.0.1', 0)) + self.server_sock.listen(1) + self.server_sock_thread.start() + + self.started_event.wait(0.5) + if not self.started_event.is_set(): + raise RuntimeError('Timeout while connecting sockets together.') + time.sleep(0.01) # Handle race condition like an amateur + + self.sock2.connect(self.server_sock.getsockname()) + t1 = time.monotonic() + while self.sock1 is None: + if (time.monotonic() - t1) > 0.5: + raise RuntimeError('Timeout while connecting sockets together.') + + def test_open_close_no_block(self): + conn = SocketConnection(self.sock1, name='unittest') + self.assertFalse(conn.is_open()) + conn.open() + self.assertTrue(conn.is_open()) + conn.close() + self.assertFalse(conn.is_open()) + + def tearDown(self): + if isinstance(self.sock1, socket.socket): + self.sock1.close() + + if isinstance(self.sock2, socket.socket): + self.sock2.close() + + if isinstance(self.server_sock, socket.socket): + self.server_sock.close() + + class TestQueueConnection(UdsTest): def setUp(self): self.conn = QueueConnection(name='unittest') diff --git a/test/test_stubbed_isotpsock.py b/test/test_stubbed_isotpsock.py deleted file mode 100755 index 3ec30b0..0000000 --- a/test/test_stubbed_isotpsock.py +++ /dev/null @@ -1,84 +0,0 @@ -import unittest -from test.UdsTest import UdsTest -from test.stub import StubbedIsoTPSocket -from udsoncan.exceptions import * -import socket - -try: - import isotp - _isotp_available = True -except ImportError: - _isotp_available = False - - -@unittest.skipIf(_isotp_available == False, "isotp module not available") -class TestStubbedIsoTPSocket(UdsTest): - def test_open(self): - tpsock = StubbedIsoTPSocket() - self.assertFalse(tpsock.bound) - addr = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x100, txid=0x101) - tpsock.bind(interface='vcan0', address=addr) - self.assertTrue(tpsock.bound) - tpsock.close() - self.assertFalse(tpsock.bound) - - def test_transmit(self): - tpsock1 = StubbedIsoTPSocket() - tpsock2 = StubbedIsoTPSocket(timeout=0.5) - addr1 = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x200, txid=0x201) - addr2 = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x201, txid=0x200) - tpsock1.bind(interface='vcan0', address=addr1) - tpsock2.bind(interface='vcan0', address=addr2) - - payload1 = b"\x01\x02\x03\x04" - tpsock1.send(payload1) - payload2 = tpsock2.recv() - self.assertEqual(payload1, payload2) - - def test_multicast(self): - tpsock1 = StubbedIsoTPSocket() - tpsock2 = StubbedIsoTPSocket(timeout=0.5) - tpsock3 = StubbedIsoTPSocket(timeout=0.5) - addr1 = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x300, txid=0x301) - addr2 = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x301, txid=0x300) - addr3 = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x301, txid=0x300) - - tpsock1.bind(interface='vcan0', address=addr1) - tpsock2.bind(interface='vcan0', address=addr2) - tpsock3.bind(interface='vcan0', address=addr3) - - payload1 = b"\x01\x02\x03\x04" - tpsock1.send(payload1) - payload2 = tpsock2.recv() - payload3 = tpsock3.recv() - self.assertEqual(payload1, payload2) - self.assertEqual(payload1, payload3) - - def test_empty_on_close(self): - tpsock1 = StubbedIsoTPSocket() - tpsock2 = StubbedIsoTPSocket(timeout=0.2) - addr1 = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x400, txid=0x401) - addr2 = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x401, txid=0x400) - tpsock1.bind(interface='vcan0', address=addr1) - tpsock2.bind(interface='vcan0', address=addr2) - - payload = b"\x01\x02\x03\x04" - tpsock1.send(payload) - tpsock2.close() - - with self.assertRaises(socket.timeout): - tpsock2.recv() - - def test_no_listener(self): - tpsock1 = StubbedIsoTPSocket() - tpsock2 = StubbedIsoTPSocket(timeout=0.2) - addr1 = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x400, txid=0x401) - addr2 = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x401, txid=0x400) - tpsock1.bind(interface='vcan0', address=addr1) - - payload = b"\x01\x02\x03\x04" - tpsock1.send(payload) - tpsock2.bind(interface='vcan0', address=addr2) - - with self.assertRaises(socket.timeout): - tpsock2.recv() diff --git a/udsoncan/connections.py b/udsoncan/connections.py index f513a2b..a77c90d 100755 --- a/udsoncan/connections.py +++ b/udsoncan/connections.py @@ -8,6 +8,7 @@ import time from typing import Union, Any, Dict import ctypes +import selectors try: import can # type:ignore @@ -198,7 +199,6 @@ def __init__(self, sock: socket.socket, bufsize: int = 4095, name: Optional[str] self.opened = False self.rxthread = None self.sock = sock - self.sock.settimeout(0.1) # for recv self.bufsize = bufsize def open(self) -> "SocketConnection": @@ -219,13 +219,15 @@ def is_open(self) -> bool: return self.opened def rxthread_task(self) -> None: + sel = selectors.DefaultSelector() + sel.register(self.sock, selectors.EVENT_READ) while not self.exit_requested: try: - data = self.sock.recv(self.bufsize) - if data is not None: - self.rxqueue.put(data) - except socket.timeout: - pass + events = sel.select(timeout=0.2) + if events: + data = self.sock.recv(self.bufsize) + if data is not None: + self.rxqueue.put(data) except Exception: self.exit_requested = True @@ -333,13 +335,15 @@ def is_open(self) -> bool: return self.tpsock.bound def rxthread_task(self) -> None: + sel = selectors.DefaultSelector() + sel.register(self.tpsock._socket, selectors.EVENT_READ) while not self.exit_requested: try: - data = self.tpsock.recv() - if data is not None: - self.rxqueue.put(data) - except socket.timeout: - pass + events = sel.select(timeout=0.2) + if events: + data = self.tpsock.recv() + if data is not None: + self.rxqueue.put(data) except Exception: self.exit_requested = True