Skip to content

Commit

Permalink
Add MailboxService
Browse files Browse the repository at this point in the history
  • Loading branch information
mickeprag committed Apr 23, 2020
1 parent 26270e6 commit d4cf389
Show file tree
Hide file tree
Showing 5 changed files with 382 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/index.rst
Expand Up @@ -52,4 +52,5 @@ Example for requesting a sensor value from a node supporting sensor multilevel
:maxdepth: 2

inclusion
sleeping
reference/pyzwave
33 changes: 33 additions & 0 deletions docs/sleeping.rst
@@ -0,0 +1,33 @@
Sleeping nodes (battery operated)
==========================================

To talk with battery operated nodes the messages must be queued until the node is awake.

The node will use the WAKE_UP command class to notify when the node is awake. Zipgateway
contains a mailbox proxy to help with queuing the messages. Using this is optional and
the queueing can be implemented by the end application instead.

Mailbox service
---------------

To use the mailbox proxy in Zipgateway the application must have a
:class:`MailboxService <pyzwave.mailbox.MailboxService>` configured.
The ip address and port should be the same as the :class:`ZipGateway <pyzwave.zipgateway.ZIPGateway>`
is configured to listen on.

.. code::
import ipaddress
from pyzwave.mailbox import MailboxService
mailbox = MailboxService(adapter)
await mailbox.initialize(ipaddress.IPv6Address("::ffff:c0a8:31"), 4123)
Sending messages
----------------

When a :class:`MailboxService <pyzwave.mailbox.MailboxService>` has been configured the
:func:`Adapter.send() <pyzwave.adapter.Adapter.send>` method will block until the node
either wakes up or is considered dead.

This can be a long time (week or even months). Please make sure the code can handle this.
11 changes: 10 additions & 1 deletion pyzwave/adapter.py
Expand Up @@ -190,7 +190,16 @@ async def removeNodeStop(self) -> bool:
async def send(
self, cmd: Message, sourceEP: int = 0, destEP: int = 0, timeout: int = 3
) -> bool:
"""Send message to Z-Wave chip. Must be implemented in subclass"""
"""
Send message to Z-Wave chip. Must be implemented in subclass.
.. warning::
This command will block until the message has been ACKed by the node.
When talking to battery operated (sleeping) nodes this command will
block until the nodes wakes up or is considered dead. This can be a
long time (week or even months). Please make sure the code can handle this.
"""
raise NotImplementedError()

async def sendToNode(self, nodeId: int, cmd: Message, **kwargs) -> bool:
Expand Down
163 changes: 163 additions & 0 deletions pyzwave/mailbox.py
@@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
import asyncio
from ipaddress import IPv6Address
import logging

import crcmod.predefined

from pyzwave.adapter import Adapter
from pyzwave.commandclass import Mailbox, Zip
from pyzwave.message import Message

_LOGGER = logging.getLogger(__name__)


class QueueItem:
"""
Class for holding one queue entry. It is also responsible for sending
it's heartbeats
"""

def __init__(self, nodeId: int, handle: int, data: bytes, adapter):
self._adapter = adapter
self._nodeId = nodeId
self._handle = handle
self._data = data
self._task = None

@property
def checksum(self) -> int:
"""Return a checksum of the data"""
# From the docs
# To avoid duplicate entries, the Mailbox Service MUST maintain a list of CRC16
# checksums for each mailbox entry. All mailbox entries MUST be unique, if a
# matching CRC16 exists for an incoming package, the incoming package MUST be
# discarded.
crc16 = crcmod.predefined.mkCrcFun("crc-aug-ccitt")
return crc16(self._data)

@property
def data(self) -> bytes:
"""The qctual queue data"""
return self._data

def start(self):
"""Start the task for sending heartbeats to the mailbox proxy"""
self._task = asyncio.ensure_future(self._runner())

def stop(self):
"""Stop sending heartbeats"""
if self._task:
self._task.cancel()

async def _runner(self):
i = 0
while True:
i += 1
await asyncio.sleep(60)
operation = Mailbox.Queue.Operation.WAITING
if i % 10 == 0:
operation = Mailbox.Queue.Operation.PING
await self._adapter.sendToNode(
self._nodeId,
Mailbox.Queue(
last=False,
operation=operation,
queueHandle=self._handle,
mailboxEntry=self._data,
),
)


class MailboxService:
"""Mailbox for storing messages for sleeping nodes"""

def __init__(self, adapter):
self._adapter: Adapter = adapter
self._adapter.addListener(self)
self._lastQueueId = None
self._queue = {}

async def initialize(self, ipaddress: IPv6Address, port: int) -> bool:
"""Initialize the mailbox"""
msg = Mailbox.ConfigurationSet(
mode=Mailbox.Mode.ENABLE_MAILBOX_PROXY_FORWARDING,
forwardingDestinationIPv6Address=ipaddress,
udpPortNumber=port,
)
return await self._adapter.send(msg)

async def __popQueue__(self, nodeId: int, queueHandle: int):
_LOGGER.debug("Pop queue (%d)", len(self._queue.get(queueHandle, [])))
if not self._queue.get(queueHandle, None):
return await self._adapter.sendToNode(
nodeId,
Mailbox.Queue(
last=True,
operation=Mailbox.Queue.Operation.POP,
queueHandle=queueHandle,
),
)
queueItem = self._queue[queueHandle].pop()
ret = await self._adapter.sendToNode(
nodeId,
Mailbox.Queue(
last=len(self._queue[queueHandle]) == 0,
operation=Mailbox.Queue.Operation.POP,
queueHandle=queueHandle,
mailboxEntry=queueItem.data,
),
)
_LOGGER.info("Sent message from queue: %s", ret)
if ret:
queueItem.stop()
return True

async def __pushQueue__(self, nodeId: int, queueHandle: int, message: bytes):
queueItem = QueueItem(nodeId, queueHandle, message, self._adapter)
checksum = queueItem.checksum
for item in self._queue.setdefault(queueHandle, []):
if item.checksum == checksum:
# Message has already been queueud
return False
ret = await self._adapter.sendToNode(
nodeId,
Mailbox.Queue(
last=False,
operation=Mailbox.Queue.Operation.WAITING,
queueHandle=queueHandle,
mailboxEntry=message,
),
)
if ret:
queueItem.start()
self._queue[queueHandle].append(queueItem)
return ret

async def messageReceived(
self,
_sender,
rootNodeId: int,
_endPoint: int,
message: Message,
_flags: Zip.HeaderExtension,
):
"""Handle incoming mailbox messages"""
if isinstance(message, Mailbox.WakeupNotification):
self._lastQueueId = message.queueHandle
return await self.__popQueue__(rootNodeId, message.queueHandle)

if (
isinstance(message, Mailbox.Queue)
and message.operation == Mailbox.Queue.Operation.ACK
):
queueHandle = message.queueHandle or self._lastQueueId
return await self.__popQueue__(rootNodeId, queueHandle)

if (
isinstance(message, Mailbox.Queue)
and message.operation == Mailbox.Queue.Operation.PUSH
):
return await self.__pushQueue__(
rootNodeId, message.queueHandle, message.mailboxEntry
)
175 changes: 175 additions & 0 deletions tests/test_mailbox.py
@@ -0,0 +1,175 @@
# pylint: disable=missing-function-docstring
# pylint: disable=missing-class-docstring
# pylint: disable=invalid-name
# pylint: disable=redefined-outer-name
# pylint: disable=protected-access
import asyncio
import ipaddress

from unittest.mock import patch, MagicMock

import pytest

from pyzwave.commandclass import Mailbox
from pyzwave.mailbox import MailboxService, QueueItem
from test_adaper import AdapterImpl

sleep = asyncio.sleep


class AsyncMock(MagicMock):
# pylint: disable=useless-super-delegation
async def __call__(self, *args, **kwargs):
return super().__call__(*args, **kwargs)


class SleepMock(MagicMock):
# pylint: disable=arguments-differ
async def __call__(self, time):
return await sleep(0)


@pytest.fixture
def mailbox() -> MailboxService:
adapter = AdapterImpl()
adapter.send = AsyncMock(return_value=True)
adapter.sendToNode = AsyncMock(return_value=True)
return MailboxService(adapter)


@pytest.mark.asyncio
async def test_initialize(mailbox: MailboxService):
assert (
await mailbox.initialize(ipaddress.IPv6Address("::ffff:c0a8:31"), 4123) is True
)


@pytest.mark.asyncio
async def test_popQueue(mailbox: MailboxService):
assert await mailbox.__pushQueue__(1, 3, b"hello") is True
mailbox._adapter.sendToNode.reset_mock()

assert await mailbox.__popQueue__(1, 3) is True
mailbox._adapter.sendToNode.assert_called_once_with(
1,
Mailbox.Queue(
last=True,
operation=Mailbox.Queue.Operation.POP,
queueHandle=3,
mailboxEntry=b"hello",
),
)


@pytest.mark.asyncio
async def test_messageReceived_QueueAck(mailbox: MailboxService):
assert (
await mailbox.messageReceived(
None,
1,
0,
Mailbox.Queue(
last=False, operation=Mailbox.Queue.Operation.ACK, queueHandle=42
),
0,
)
is True
)


@pytest.mark.asyncio
async def test_messageReceived_QueuePush(mailbox: MailboxService):
assert (
await mailbox.messageReceived(
None,
1,
0,
Mailbox.Queue(
last=False,
operation=Mailbox.Queue.Operation.PUSH,
queueHandle=42,
mailboxEntry=b"hello",
),
0,
)
is True
)
mailbox._adapter.sendToNode.assert_called_once_with(
1,
Mailbox.Queue(
last=False,
operation=Mailbox.Queue.Operation.WAITING,
queueHandle=42,
mailboxEntry=b"hello",
),
)


@pytest.mark.asyncio
async def test_messageReceived_WakeupNotification(mailbox: MailboxService):
assert mailbox._lastQueueId is None
assert (
await mailbox.messageReceived(
None, 1, 0, Mailbox.WakeupNotification(queueHandle=42), 0
)
is True
)
assert mailbox._lastQueueId == 42


@pytest.mark.asyncio
async def test_popQueue_empty(mailbox: MailboxService):
assert await mailbox.__popQueue__(1, 3) is True
mailbox._adapter.sendToNode.assert_called_once_with(
1,
Mailbox.Queue(last=True, operation=Mailbox.Queue.Operation.POP, queueHandle=3,),
)


@pytest.mark.asyncio
async def test_pushQueue(mailbox: MailboxService):
assert len(mailbox._queue) == 0
assert await mailbox.__pushQueue__(1, 3, b"hello") is True
assert len(mailbox._queue[3]) == 1

# Adding the same message again should be discarded
assert await mailbox.__pushQueue__(1, 3, b"hello") is False
assert len(mailbox._queue[3]) == 1


@pytest.mark.asyncio
async def test_QueueItem_runner(mailbox: MailboxService):
async def runner():
with patch("asyncio.sleep", new_callable=SleepMock):
queueItem = QueueItem(1, 2, b"hello", mailbox._adapter)
await queueItem._runner()

task = asyncio.ensure_future(runner())

await sleep(0)
# Make sure the first 9 minutes it's sending waiting messages
for _ in range(9):
mailbox._adapter.sendToNode.reset_mock()
await sleep(0)
mailbox._adapter.sendToNode.assert_called_once_with(
1,
Mailbox.Queue(
last=False,
operation=Mailbox.Queue.Operation.WAITING,
queueHandle=2,
mailboxEntry=b"hello",
),
)
# After 10 minutes there should be a ping instead
mailbox._adapter.sendToNode.reset_mock()
await sleep(0)
mailbox._adapter.sendToNode.assert_called_once_with(
1,
Mailbox.Queue(
last=False,
operation=Mailbox.Queue.Operation.PING,
queueHandle=2,
mailboxEntry=b"hello",
),
)
task.cancel()

0 comments on commit d4cf389

Please sign in to comment.