In [None]:
import asyncio
import logging
from typing import Any, Callable, Dict, NamedTuple, Optional, Tuple

In [None]:
_LOGGER = logging.getLogger(__name__)

In [None]:
class _UdpInverterProtocol(asyncio.DatagramProtocol):
    def __init__(
            self,
            request: bytes,
            validator: Callable[[bytes], bool],
            on_response_received: asyncio.futures.Future,
            timeout: int = 2,
            retries: int = 3
    ):
        self.request: bytes = request
        self.validator: Callable[[bytes], bool] = validator
        self.on_response_received: asyncio.futures.Future = on_response_received
        self.transport: asyncio.transports.DatagramTransport
        self.timeout: int = timeout
        self.retries: int = retries
        self.retry_nr: int = 1

    def connection_made(self, transport: asyncio.transports.DatagramTransport):
        self.transport = transport
        _LOGGER.debug("Send: '%s'", self.request.hex())
        self.transport.sendto(self.request)
        asyncio.get_event_loop().call_later(self.timeout, self._timeout_heartbeat)

    def connection_lost(self, exc: Exception):
        if exc is not None:
            _LOGGER.debug("Socket closed with error: '%s'", exc)
        if not self.on_response_received.done():
            self.on_response_received.cancel()

    def datagram_received(self, data: bytes, addr: Tuple[str, int]):
        _LOGGER.debug("Received: '%s'", data.hex())
        if self.validator(data):
            self.on_response_received.set_result(data)
            self.transport.close()
        else:
            _LOGGER.debug(
                "Invalid response length: %d",
                len(data),
            )
            self.retry_nr += 1
            self.connection_made(self.transport)

    def error_received(self, exc: Exception):
        _LOGGER.debug("Received error: '%s'", exc)

    def _timeout_heartbeat(self):
        if self.on_response_received.done():
            self.transport.close()
        elif self.retry_nr <= self.retries:
            _LOGGER.debug("Re-try #%d", self.retry_nr)
            self.retry_nr += 1
            self.connection_made(self.transport)
        else:
            _LOGGER.debug("Re-try #%d, closing socket", self.retry_nr)
            self.transport.close()

In [None]:
class ProtocolCommand:
    """Definition of inverter protocol command"""

    def __init__(self, request: bytes, validator: Callable[[bytes], bool]):
        self.request: bytes = request
        self.validator: Callable[[bytes], bool] = validator

    async def execute(self, host: str, port: int, timeout: int = 2, retries: int = 3) -> bytes:
        """
        Execute the udp protocol command on the specified address/port.
        Since the UDP communication is by definition unreliable, when no (valid) response is received by specified
        timeout, the command will be re-tried up to retries times.

        Return raw response data
        """
        loop = asyncio.get_running_loop()
        on_response_received = loop.create_future()
        transport, _ = await loop.create_datagram_endpoint(
            lambda: _UdpInverterProtocol(
                self.request, self.validator, on_response_received, timeout, retries
            ),
            remote_addr=(host, port),
        )
        try:
            await on_response_received
            result = on_response_received.result()
            if result is not None:
                return result
            else:
                raise InverterError(
                    "No response received to '" + self.request.hex() + "' request"
                )
        except asyncio.exceptions.CancelledError:
            raise InverterError(
                "No valid response received to '" + self.request.hex() + "' request"
            ) from None
        finally:
            transport.close()

In [None]:
class Aa55ProtocolCommand(ProtocolCommand):
    """
    Inverter communication protocol based on 0xAA,0x55 kinds of commands.
    Each comand starts with header of 0xAA, 0x55, 0xC0, 0x7F followed by payload data.
    It is suffixed with 2 bytes of plain checksum of header+payload.
    """

    def __init__(self, payload: str, response_type: str):
        super().__init__(
            bytes.fromhex(
                "AA55C07F"
                + payload
                + self._checksum(bytes.fromhex("AA55C07F" + payload)).hex()
            ),
            lambda x: self._validate_response(x, response_type),
        )

    @staticmethod
    def _checksum(data: bytes) -> bytes:
        checksum = 0
        for each in data:
            checksum += each
        return checksum.to_bytes(2, byteorder="big", signed=False)

    @staticmethod
    def _validate_response(data: bytes, response_type: str) -> bool:
        """
        Validate the response.
        data[0:3] is header
        data[4:5] is response type
        data[6] is response payload length
        data[-2:] is checksum (plain sum of response data incl. header)
        """
        if (
                len(data) <= 8
                or len(data) != data[6] + 9
                or (response_type and int(response_type, 16) != _read_bytes2(data[4:6], 0))
        ):
            return False
        else:
            checksum = 0
            for each in data[:-2]:
                checksum += each
            return checksum == _read_bytes2(data[-2:], 0)

In [None]:
def _create_crc16_table():
    """Construct (modbus) CRC-16 table"""
    table = []
    for i in range(256):
        data = i << 1
        crc = 0
        for _ in range(8, 0, -1):
            data >>= 1
            if (data ^ crc) & 0x0001:
                crc = (crc >> 1) ^ 0xA001
            else:
                crc >>= 1
        table.append(crc)
    return tuple(table)


In [None]:
class ModbusProtocolCommand(ProtocolCommand):
    """
    Inverter communication protocol, suffixes each payload with 2 bytes of Modbus-CRC16 checksum of the payload.
    """

    _CRC_16_TABLE = _create_crc16_table()

    def __init__(self, payload: str, response_len: int = 0):
        super().__init__(
            bytes.fromhex(
                payload + self._checksum(bytes.fromhex(payload))
            ),
            lambda x: self._validate_response(x, response_len),
        )

    @classmethod
    def _checksum(cls, data: bytes) -> str:
        crc = 0xFFFF
        for ch in data:
            crc = (crc >> 8) ^ cls._CRC_16_TABLE[(crc ^ ch) & 0xFF]
        res = "{:04x}".format(crc)
        return res[2:] + res[:2]

    @classmethod
    def _validate_response(cls, data: bytes, response_len: int) -> bool:
        """
        Validate the response.
        data[0:1] is header
        data[2:3] is response type
        data[4] is response payload length ??
        data[-2:] is crc-16 checksum
        """
        if len(data) <= 4 or (response_len != 0 and response_len != len(data)):
            return False
        return cls._checksum(data[2:-2]) == data[-2:].hex()

In [None]:
host= '192.168.0.48'
port= 8899
timeout = 2 #s
retries = 2

response = await ModbusProtocolCommand("F70388b800213AC1", 73).execute(host, port, timeout, retries)
print(response.hex())
print(response[11:27].decode("ascii").rstrip())
print(response[27:36].decode("ascii").rstrip())
print(response[59:71].decode("ascii").rstrip())

In [None]:
response = await ModbusProtocolCommand("F703891c007d7ae7", 257).execute(host, port, timeout, retries)
# response = bytes.fromhex("aa55f703fa150808160c2800000000000000000000000000000000ffffffffffffffffffffffffffffffff0000000009200000138800000000ffffffffffff7fffffffffffffffffff7fffffff0001000000000000fe8e7fffffffffffffff000000000000000000000000ffffffffffffffffffffffffffffffffffffffffffffffff000001727fffffff7fffffff00000000000001720000021e7fff013101000b47ffff00000001000000000000008100030000ffff000000000000025c000000e6000002560000002400e4000000000000000002d30125000000000000000000000000000700000000000000000000400000010308484600000000ffffb70e")

print(response.hex())
print('------')
offset = 5
pos = 152 + offset
size = 2


value = int.from_bytes(response[pos:pos+size], byteorder="big", signed=True)
if value > 32768:
    value = value - 65535
print(value/10)