diff --git a/docs/index.rst b/docs/index.rst index f49cfa6..27737f7 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -52,4 +52,5 @@ Example for requesting a sensor value from a node supporting sensor multilevel :maxdepth: 2 inclusion + sleeping reference/pyzwave diff --git a/docs/sleeping.rst b/docs/sleeping.rst new file mode 100644 index 0000000..6b8953c --- /dev/null +++ b/docs/sleeping.rst @@ -0,0 +1,33 @@ +Sleeping nodes (battery operated) +========================================== + +To talk with battery operated nodes the messages must be queued until the node is awake. + +The node will use the WAKE_UP command class to notify when the node is awake. Zipgateway +contains a mailbox proxy to help with queuing the messages. Using this is optional and +the queueing can be implemented by the end application instead. + +Mailbox service +--------------- + +To use the mailbox proxy in Zipgateway the application must have a +:class:`MailboxService ` configured. +The ip address and port should be the same as the :class:`ZipGateway ` +is configured to listen on. + +.. code:: + + import ipaddress + from pyzwave.mailbox import MailboxService + + mailbox = MailboxService(adapter) + await mailbox.initialize(ipaddress.IPv6Address("::ffff:c0a8:31"), 4123) + +Sending messages +---------------- + +When a :class:`MailboxService ` has been configured the +:func:`Adapter.send() ` method will block until the node +either wakes up or is considered dead. + +This can be a long time (week or even months). Please make sure the code can handle this. diff --git a/pyzwave/adapter.py b/pyzwave/adapter.py index c624aec..4a6e05f 100644 --- a/pyzwave/adapter.py +++ b/pyzwave/adapter.py @@ -190,7 +190,16 @@ async def removeNodeStop(self) -> bool: async def send( self, cmd: Message, sourceEP: int = 0, destEP: int = 0, timeout: int = 3 ) -> bool: - """Send message to Z-Wave chip. Must be implemented in subclass""" + """ + Send message to Z-Wave chip. Must be implemented in subclass. + + .. warning:: + This command will block until the message has been ACKed by the node. + + When talking to battery operated (sleeping) nodes this command will + block until the nodes wakes up or is considered dead. This can be a + long time (week or even months). Please make sure the code can handle this. + """ raise NotImplementedError() async def sendToNode(self, nodeId: int, cmd: Message, **kwargs) -> bool: diff --git a/pyzwave/mailbox.py b/pyzwave/mailbox.py new file mode 100644 index 0000000..68ca9af --- /dev/null +++ b/pyzwave/mailbox.py @@ -0,0 +1,163 @@ +# -*- coding: utf-8 -*- +import asyncio +from ipaddress import IPv6Address +import logging + +import crcmod.predefined + +from pyzwave.adapter import Adapter +from pyzwave.commandclass import Mailbox, Zip +from pyzwave.message import Message + +_LOGGER = logging.getLogger(__name__) + + +class QueueItem: + """ + Class for holding one queue entry. It is also responsible for sending + it's heartbeats + """ + + def __init__(self, nodeId: int, handle: int, data: bytes, adapter): + self._adapter = adapter + self._nodeId = nodeId + self._handle = handle + self._data = data + self._task = None + + @property + def checksum(self) -> int: + """Return a checksum of the data""" + # From the docs + # To avoid duplicate entries, the Mailbox Service MUST maintain a list of CRC16 + # checksums for each mailbox entry. All mailbox entries MUST be unique, if a + # matching CRC16 exists for an incoming package, the incoming package MUST be + # discarded. + crc16 = crcmod.predefined.mkCrcFun("crc-aug-ccitt") + return crc16(self._data) + + @property + def data(self) -> bytes: + """The qctual queue data""" + return self._data + + def start(self): + """Start the task for sending heartbeats to the mailbox proxy""" + self._task = asyncio.ensure_future(self._runner()) + + def stop(self): + """Stop sending heartbeats""" + if self._task: + self._task.cancel() + + async def _runner(self): + i = 0 + while True: + i += 1 + await asyncio.sleep(60) + operation = Mailbox.Queue.Operation.WAITING + if i % 10 == 0: + operation = Mailbox.Queue.Operation.PING + await self._adapter.sendToNode( + self._nodeId, + Mailbox.Queue( + last=False, + operation=operation, + queueHandle=self._handle, + mailboxEntry=self._data, + ), + ) + + +class MailboxService: + """Mailbox for storing messages for sleeping nodes""" + + def __init__(self, adapter): + self._adapter: Adapter = adapter + self._adapter.addListener(self) + self._lastQueueId = None + self._queue = {} + + async def initialize(self, ipaddress: IPv6Address, port: int) -> bool: + """Initialize the mailbox""" + msg = Mailbox.ConfigurationSet( + mode=Mailbox.Mode.ENABLE_MAILBOX_PROXY_FORWARDING, + forwardingDestinationIPv6Address=ipaddress, + udpPortNumber=port, + ) + return await self._adapter.send(msg) + + async def __popQueue__(self, nodeId: int, queueHandle: int): + _LOGGER.debug("Pop queue (%d)", len(self._queue.get(queueHandle, []))) + if not self._queue.get(queueHandle, None): + return await self._adapter.sendToNode( + nodeId, + Mailbox.Queue( + last=True, + operation=Mailbox.Queue.Operation.POP, + queueHandle=queueHandle, + ), + ) + queueItem = self._queue[queueHandle].pop() + ret = await self._adapter.sendToNode( + nodeId, + Mailbox.Queue( + last=len(self._queue[queueHandle]) == 0, + operation=Mailbox.Queue.Operation.POP, + queueHandle=queueHandle, + mailboxEntry=queueItem.data, + ), + ) + _LOGGER.info("Sent message from queue: %s", ret) + if ret: + queueItem.stop() + return True + + async def __pushQueue__(self, nodeId: int, queueHandle: int, message: bytes): + queueItem = QueueItem(nodeId, queueHandle, message, self._adapter) + checksum = queueItem.checksum + for item in self._queue.setdefault(queueHandle, []): + if item.checksum == checksum: + # Message has already been queueud + return False + ret = await self._adapter.sendToNode( + nodeId, + Mailbox.Queue( + last=False, + operation=Mailbox.Queue.Operation.WAITING, + queueHandle=queueHandle, + mailboxEntry=message, + ), + ) + if ret: + queueItem.start() + self._queue[queueHandle].append(queueItem) + return ret + + async def messageReceived( + self, + _sender, + rootNodeId: int, + _endPoint: int, + message: Message, + _flags: Zip.HeaderExtension, + ): + """Handle incoming mailbox messages""" + if isinstance(message, Mailbox.WakeupNotification): + self._lastQueueId = message.queueHandle + return await self.__popQueue__(rootNodeId, message.queueHandle) + + if ( + isinstance(message, Mailbox.Queue) + and message.operation == Mailbox.Queue.Operation.ACK + ): + queueHandle = message.queueHandle or self._lastQueueId + return await self.__popQueue__(rootNodeId, queueHandle) + + if ( + isinstance(message, Mailbox.Queue) + and message.operation == Mailbox.Queue.Operation.PUSH + ): + return await self.__pushQueue__( + rootNodeId, message.queueHandle, message.mailboxEntry + ) diff --git a/tests/test_mailbox.py b/tests/test_mailbox.py new file mode 100644 index 0000000..53785b2 --- /dev/null +++ b/tests/test_mailbox.py @@ -0,0 +1,175 @@ +# pylint: disable=missing-function-docstring +# pylint: disable=missing-class-docstring +# pylint: disable=invalid-name +# pylint: disable=redefined-outer-name +# pylint: disable=protected-access +import asyncio +import ipaddress + +from unittest.mock import patch, MagicMock + +import pytest + +from pyzwave.commandclass import Mailbox +from pyzwave.mailbox import MailboxService, QueueItem +from test_adaper import AdapterImpl + +sleep = asyncio.sleep + + +class AsyncMock(MagicMock): + # pylint: disable=useless-super-delegation + async def __call__(self, *args, **kwargs): + return super().__call__(*args, **kwargs) + + +class SleepMock(MagicMock): + # pylint: disable=arguments-differ + async def __call__(self, time): + return await sleep(0) + + +@pytest.fixture +def mailbox() -> MailboxService: + adapter = AdapterImpl() + adapter.send = AsyncMock(return_value=True) + adapter.sendToNode = AsyncMock(return_value=True) + return MailboxService(adapter) + + +@pytest.mark.asyncio +async def test_initialize(mailbox: MailboxService): + assert ( + await mailbox.initialize(ipaddress.IPv6Address("::ffff:c0a8:31"), 4123) is True + ) + + +@pytest.mark.asyncio +async def test_popQueue(mailbox: MailboxService): + assert await mailbox.__pushQueue__(1, 3, b"hello") is True + mailbox._adapter.sendToNode.reset_mock() + + assert await mailbox.__popQueue__(1, 3) is True + mailbox._adapter.sendToNode.assert_called_once_with( + 1, + Mailbox.Queue( + last=True, + operation=Mailbox.Queue.Operation.POP, + queueHandle=3, + mailboxEntry=b"hello", + ), + ) + + +@pytest.mark.asyncio +async def test_messageReceived_QueueAck(mailbox: MailboxService): + assert ( + await mailbox.messageReceived( + None, + 1, + 0, + Mailbox.Queue( + last=False, operation=Mailbox.Queue.Operation.ACK, queueHandle=42 + ), + 0, + ) + is True + ) + + +@pytest.mark.asyncio +async def test_messageReceived_QueuePush(mailbox: MailboxService): + assert ( + await mailbox.messageReceived( + None, + 1, + 0, + Mailbox.Queue( + last=False, + operation=Mailbox.Queue.Operation.PUSH, + queueHandle=42, + mailboxEntry=b"hello", + ), + 0, + ) + is True + ) + mailbox._adapter.sendToNode.assert_called_once_with( + 1, + Mailbox.Queue( + last=False, + operation=Mailbox.Queue.Operation.WAITING, + queueHandle=42, + mailboxEntry=b"hello", + ), + ) + + +@pytest.mark.asyncio +async def test_messageReceived_WakeupNotification(mailbox: MailboxService): + assert mailbox._lastQueueId is None + assert ( + await mailbox.messageReceived( + None, 1, 0, Mailbox.WakeupNotification(queueHandle=42), 0 + ) + is True + ) + assert mailbox._lastQueueId == 42 + + +@pytest.mark.asyncio +async def test_popQueue_empty(mailbox: MailboxService): + assert await mailbox.__popQueue__(1, 3) is True + mailbox._adapter.sendToNode.assert_called_once_with( + 1, + Mailbox.Queue(last=True, operation=Mailbox.Queue.Operation.POP, queueHandle=3,), + ) + + +@pytest.mark.asyncio +async def test_pushQueue(mailbox: MailboxService): + assert len(mailbox._queue) == 0 + assert await mailbox.__pushQueue__(1, 3, b"hello") is True + assert len(mailbox._queue[3]) == 1 + + # Adding the same message again should be discarded + assert await mailbox.__pushQueue__(1, 3, b"hello") is False + assert len(mailbox._queue[3]) == 1 + + +@pytest.mark.asyncio +async def test_QueueItem_runner(mailbox: MailboxService): + async def runner(): + with patch("asyncio.sleep", new_callable=SleepMock): + queueItem = QueueItem(1, 2, b"hello", mailbox._adapter) + await queueItem._runner() + + task = asyncio.ensure_future(runner()) + + await sleep(0) + # Make sure the first 9 minutes it's sending waiting messages + for _ in range(9): + mailbox._adapter.sendToNode.reset_mock() + await sleep(0) + mailbox._adapter.sendToNode.assert_called_once_with( + 1, + Mailbox.Queue( + last=False, + operation=Mailbox.Queue.Operation.WAITING, + queueHandle=2, + mailboxEntry=b"hello", + ), + ) + # After 10 minutes there should be a ping instead + mailbox._adapter.sendToNode.reset_mock() + await sleep(0) + mailbox._adapter.sendToNode.assert_called_once_with( + 1, + Mailbox.Queue( + last=False, + operation=Mailbox.Queue.Operation.PING, + queueHandle=2, + mailboxEntry=b"hello", + ), + ) + task.cancel()