Skip to content

Commit

Permalink
tests: Replace deprecated asynctest with manual TCP socket in `Th…
Browse files Browse the repository at this point in the history
…read` (#369)

* tests: Replace deprecated `asynctest` with manual TCP `socket` in `Thread`

`asynctest` is unmaintained and deprecated yet there appears to be no
clean replacement to mock the `async` methods of an `asyncio` client
socket, as previously passed to `asyncio.open_connection()`.  Instead of
inventing a complicated `asyncio` solution around this, create and bind
a `socket` (so that we know the randomly chosen open port upfront) and
accept a single client within a `Thread`.  This temporary `Thread`
closes the connection and can later be `join()`ed when the test is over
to make sure it has finalized and exited.

* test_tcp_models: Always `shutdown()` socket before `close()`

* Allow typechecker union syntax by importing `__future__.annotations`

* test_tcp_models: Only bind server to `localhost`

The client only connects to `localhost` after all, no need to bind all
interfaces.

* test_tcp_models: Enable linger with 0-timeout to force `OSError("Connection reset by peer")`
  • Loading branch information
MarijnS95 committed Apr 8, 2023
1 parent b630a39 commit efcd826
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 134 deletions.
7 changes: 1 addition & 6 deletions omnikinverter/omnikinverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ class OmnikInverter:

_close_session: bool = False

_socket_mock: None = None

async def request(
self,
uri: str,
Expand Down Expand Up @@ -139,10 +137,7 @@ async def tcp_request(self) -> dict[str, Any]:
raise OmnikInverterAuthError(msg)

try:
if self._socket_mock is not None:
reader, writer = await asyncio.open_connection(sock=self._socket_mock)
else: # pragma: no cover
reader, writer = await asyncio.open_connection(self.host, self.tcp_port)
reader, writer = await asyncio.open_connection(self.host, self.tcp_port)
except OSError as exception:
msg = "Failed to open a TCP connection to the Omnik Inverter device"
raise OmnikInverterConnectionError(msg) from exception
Expand Down
16 changes: 2 additions & 14 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ pytest = "^7.2.2"
pytest-asyncio = "^0.21.0"
pytest-cov = "^4.0.0"
yamllint = "^1.30.0"
asynctest = "^0.13.0"
covdefaults = "^2.3.0"

[tool.black]
Expand Down
260 changes: 147 additions & 113 deletions tests/test_tcp_models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
"""Test the models from TCP source."""
import socket

import asynctest
from __future__ import annotations

import struct
from socket import SHUT_RDWR, SO_LINGER, SOL_SOCKET, socket
from threading import Thread
from typing import TYPE_CHECKING

import pytest

from omnikinverter import Device, Inverter, OmnikInverter, tcp
Expand All @@ -13,6 +18,9 @@

from . import load_fixture_bytes

if TYPE_CHECKING:
from collections.abc import Callable, Coroutine


async def test_inverter_tcp_start_marker() -> None:
"""Require start marker."""
Expand Down Expand Up @@ -213,129 +221,155 @@ async def test_tcp_serial_number_unset() -> None:
assert str(excinfo.value) == "serial_number is missing from the request"


class TestTcpWithSocketMock(asynctest.TestCase): # type: ignore[misc]
"""Test cases specific to the TCP backend."""
def tcp_server(
serial_number: int,
reply: str | Callable[[socket], None],
) -> tuple[Coroutine[None, None, None], int]:
"""Run a TCP socket server in a new thread, accepting a single connection."""
# Create socket and generate random port
sock = socket()
sock.bind(("localhost", 0))
(_, port) = sock.getsockname()
sock.listen(1)

async def test_inverter_tcp(self) -> None:
"""Test request from an Inverter - TCP source."""
serial_number = 987654321
socket_mock = asynctest.SocketMock()
socket_mock.type = socket.SOCK_STREAM
def worker() -> None:
"""Accept a single connection and send predefined reply."""
(conn, _) = sock.accept()

def send_side_effect(data: bytes) -> int:
assert data == tcp.create_information_request(serial_number)
asynctest.set_read_ready(socket_mock, self.loop)
return len(data)
data = conn.recv(1024)
req = tcp.create_information_request(serial_number)
assert data == req

def recv_side_effect(_max_bytes: int) -> bytes:
return load_fixture_bytes("tcp_reply.data")
if isinstance(reply, str):
conn.sendall(load_fixture_bytes(reply))
conn.shutdown(SHUT_RDWR)
conn.close()
else:
reply(conn)

socket_mock.send.side_effect = send_side_effect
socket_mock.recv.side_effect = recv_side_effect
# Stop the server
sock.close()

client = OmnikInverter(
host="example.com",
source_type="tcp",
serial_number=serial_number,
_socket_mock=socket_mock,
)
thread = Thread(target=worker)
thread.start()

inverter: Inverter = await client.inverter()

assert inverter
assert inverter.solar_rated_power is None
assert inverter.solar_current_power == 2615

assert inverter.model is None
assert inverter.serial_number == "NLDN012345CS4321"
assert inverter.temperature == 43.1
assert inverter.dc_input_voltage == [187.3, 188.9, None]
assert inverter.dc_input_current == [8.1, 7.800000000000001, None]
assert inverter.ac_output_voltage == [239.60000000000002, None, None]
assert inverter.ac_output_current == [10.8, None, None]
assert inverter.ac_output_frequency == [50.06, None, None]
assert inverter.ac_output_power == [2615.0, None, None]
assert inverter.solar_energy_today == 7.4
assert inverter.solar_energy_total == 65432.100000000006
assert inverter.solar_hours_total == 54321
assert inverter.inverter_active is True
assert inverter.firmware == "NL1-V1.0-0077-4"
assert inverter.firmware_slave == "V2.0-0024"

async def test_inverter_tcp_offline(self) -> None:
"""Test request from an Inverter (offline) - TCP source."""
serial_number = 1608449224
socket_mock = asynctest.SocketMock()
socket_mock.type = socket.SOCK_STREAM

def send_side_effect(data: bytes) -> int:
assert data == tcp.create_information_request(serial_number)
asynctest.set_read_ready(socket_mock, self.loop)
return len(data)

def recv_side_effect(_max_bytes: int) -> bytes:
return load_fixture_bytes("tcp_reply_offline.data")

socket_mock.send.side_effect = send_side_effect
socket_mock.recv.side_effect = recv_side_effect

client = OmnikInverter(
host="example.com",
source_type="tcp",
serial_number=serial_number,
_socket_mock=socket_mock,
)
async def wait_for_server_thread() -> None:
thread.join()

return (wait_for_server_thread(), port)


async def test_inverter_tcp() -> None:
"""Test request from an Inverter - TCP source."""
serial_number = 987654321

(server_exit, port) = tcp_server(serial_number, "tcp_reply.data")

client = OmnikInverter(
host="localhost",
source_type="tcp",
serial_number=serial_number,
tcp_port=port,
)

inverter: Inverter = await client.inverter()

assert inverter
assert inverter.solar_rated_power is None
assert inverter.solar_current_power == 2615

assert inverter.model is None
assert inverter.serial_number == "NLDN012345CS4321"
assert inverter.temperature == 43.1
assert inverter.dc_input_voltage == [187.3, 188.9, None]
assert inverter.dc_input_current == [8.1, 7.800000000000001, None]
assert inverter.ac_output_voltage == [239.60000000000002, None, None]
assert inverter.ac_output_current == [10.8, None, None]
assert inverter.ac_output_frequency == [50.06, None, None]
assert inverter.ac_output_power == [2615.0, None, None]
assert inverter.solar_energy_today == 7.4
assert inverter.solar_energy_total == 65432.100000000006
assert inverter.solar_hours_total == 54321
assert inverter.inverter_active is True
assert inverter.firmware == "NL1-V1.0-0077-4"
assert inverter.firmware_slave == "V2.0-0024"

await server_exit

inverter: Inverter = await client.inverter()

assert inverter
assert inverter.solar_rated_power is None
assert inverter.solar_current_power == 0

assert inverter.model is None
assert inverter.serial_number == "NLBN4020157P9024"
assert inverter.temperature is None
assert inverter.dc_input_voltage == [0.0, 0.0, 0.0]
assert inverter.dc_input_current == [0.0, 0.0, 0.0]
assert inverter.ac_output_voltage == [0.0, 0.0, 0.0]
assert inverter.ac_output_current == [0.0, 0.0, 0.0]
assert inverter.ac_output_frequency == [0.0, 0.0, 0.0]
assert inverter.ac_output_power == [0.0, 0.0, 0.0]
assert inverter.solar_energy_today == 4.7
assert inverter.solar_energy_total == 15818.0
assert inverter.solar_hours_total == 0
assert inverter.inverter_active is False
assert not inverter.firmware
assert not inverter.firmware_slave

async def test_connection_broken(self) -> None:
"""Test on connection broken after success - TCP source."""
serial_number = 1
socket_mock = asynctest.SocketMock()
socket_mock.type = socket.SOCK_STREAM

client = OmnikInverter(
host="example.com",
source_type="tcp",
serial_number=serial_number,
_socket_mock=socket_mock,
)

def send_side_effect(data: bytes) -> int:
assert data == tcp.create_information_request(serial_number)
asynctest.set_read_ready(socket_mock, self.loop)
return len(data)
async def test_inverter_tcp_offline() -> None:
"""Test request from an Inverter (offline) - TCP source."""
serial_number = 1608449224

socket_mock.send.side_effect = send_side_effect
socket_mock.recv.side_effect = OSError("Connection broken")
(server_exit, port) = tcp_server(serial_number, "tcp_reply_offline.data")

with pytest.raises(OmnikInverterConnectionError) as excinfo:
assert await client.inverter()
client = OmnikInverter(
host="localhost",
source_type="tcp",
serial_number=serial_number,
tcp_port=port,
)

inverter: Inverter = await client.inverter()

assert inverter
assert inverter.solar_rated_power is None
assert inverter.solar_current_power == 0

assert inverter.model is None
assert inverter.serial_number == "NLBN4020157P9024"
assert inverter.temperature is None
assert inverter.dc_input_voltage == [0.0, 0.0, 0.0]
assert inverter.dc_input_current == [0.0, 0.0, 0.0]
assert inverter.ac_output_voltage == [0.0, 0.0, 0.0]
assert inverter.ac_output_current == [0.0, 0.0, 0.0]
assert inverter.ac_output_frequency == [0.0, 0.0, 0.0]
assert inverter.ac_output_power == [0.0, 0.0, 0.0]
assert inverter.solar_energy_today == 4.7
assert inverter.solar_energy_total == 15818.0
assert inverter.solar_hours_total == 0
assert inverter.inverter_active is False
assert not inverter.firmware
assert not inverter.firmware_slave

await server_exit


async def test_connection_broken() -> None:
"""Test closed connection after successful handshake - TCP source."""
serial_number = 1

assert (
str(excinfo.value)
== "Failed to communicate with the Omnik Inverter device over TCP"
def close_immediately(conn: socket) -> None:
"""Close the connection and send RST."""
linger_on = 1
linger_timeout = 0
# Enabling linger with a timeout of 0 causes close() to abort the connection,
# forcing "Connection reset by peer" on the client
conn.setsockopt(
SOL_SOCKET,
SO_LINGER,
struct.pack("ii", linger_on, linger_timeout),
)
conn.close()

(server_exit, port) = tcp_server(serial_number, close_immediately)

client = OmnikInverter(
host="localhost",
source_type="tcp",
serial_number=serial_number,
tcp_port=port,
)

with pytest.raises(OmnikInverterConnectionError) as excinfo:
assert await client.inverter()

assert (
excinfo.value.args[0]
== "Failed to communicate with the Omnik Inverter device over TCP"
)

await server_exit


async def test_connection_failed() -> None:
Expand Down

0 comments on commit efcd826

Please sign in to comment.