Skip to content

Commit

Permalink
Increase Device coverage (#16)
Browse files Browse the repository at this point in the history
* Increase device coverage

* increase minimum coverage %

* update ruff in precommit
  • Loading branch information
dmulcahey committed Mar 28, 2024
1 parent da6f8ad commit 546033d
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 47 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ jobs:
CACHE_VERSION: 2
PYTHON_VERSION_DEFAULT: 3.12
PRE_COMMIT_CACHE_PATH: ~/.cache/pre-commit
MINIMUM_COVERAGE_PERCENTAGE: 92
MINIMUM_COVERAGE_PERCENTAGE: 93
PYTHON_MATRIX: "3.12"
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ repos:
- id: mypy

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.3.2
rev: v0.3.4
hooks:
- id: ruff
args: ["--fix", "--exit-non-zero-on-fix", "--config", "pyproject.toml"]
323 changes: 321 additions & 2 deletions tests/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,29 @@
import logging
import time
from unittest import mock
from unittest.mock import patch
from unittest.mock import call, patch

import pytest
from zigpy.device import Device as ZigpyDevice
from zigpy.exceptions import ZigbeeException
import zigpy.profiles.zha
import zigpy.types
from zigpy.zcl.clusters import general
from zigpy.zcl.foundation import Status, WriteAttributesResponse
import zigpy.zdo.types as zdo_t

from tests.conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_TYPE
from zha.application.const import (
CLUSTER_COMMAND_SERVER,
CLUSTER_COMMANDS_CLIENT,
CLUSTER_COMMANDS_SERVER,
CLUSTER_TYPE_IN,
CLUSTER_TYPE_OUT,
)
from zha.application.gateway import Gateway
from zha.zigbee.device import Device
from zha.exceptions import ZHAException
from zha.zigbee.device import ClusterBinding, Device
from zha.zigbee.group import Group


@pytest.fixture
Expand Down Expand Up @@ -265,3 +276,311 @@ async def test_device_is_active_coordinator(

assert current_coordinator.is_active_coordinator
assert not stale_coordinator.is_active_coordinator


async def test_async_get_clusters(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
) -> None:
"""Test async_get_clusters method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zha_device = await device_joined(zigpy_dev)

assert zha_device.async_get_clusters() == {
3: {
CLUSTER_TYPE_IN: {
general.Basic.cluster_id: zigpy_dev.endpoints[3].in_clusters[
general.Basic.cluster_id
],
general.OnOff.cluster_id: zigpy_dev.endpoints[3].in_clusters[
general.OnOff.cluster_id
],
},
CLUSTER_TYPE_OUT: {},
}
}


async def test_async_get_groupable_endpoints(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
) -> None:
"""Test async_get_groupable_endpoints method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zigpy_dev.endpoints[3].add_input_cluster(general.Groups.cluster_id)
zha_device = await device_joined(zigpy_dev)

assert zha_device.async_get_groupable_endpoints() == [3]


async def test_async_get_std_clusters(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
) -> None:
"""Test async_get_std_clusters method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zigpy_dev.endpoints[3].profile_id = zigpy.profiles.zha.PROFILE_ID
zha_device = await device_joined(zigpy_dev)

assert zha_device.async_get_std_clusters() == {
3: {
CLUSTER_TYPE_IN: {
general.Basic.cluster_id: zigpy_dev.endpoints[3].in_clusters[
general.Basic.cluster_id
],
general.OnOff.cluster_id: zigpy_dev.endpoints[3].in_clusters[
general.OnOff.cluster_id
],
},
CLUSTER_TYPE_OUT: {},
}
}


async def test_async_get_cluster(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
) -> None:
"""Test async_get_cluster method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zha_device = await device_joined(zigpy_dev)

assert zha_device.async_get_cluster(3, general.OnOff.cluster_id) == (
zigpy_dev.endpoints[3].on_off
)


async def test_async_get_cluster_attributes(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
) -> None:
"""Test async_get_cluster_attributes method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zha_device = await device_joined(zigpy_dev)

assert (
zha_device.async_get_cluster_attributes(3, general.OnOff.cluster_id)
== zigpy_dev.endpoints[3].on_off.attributes
)


async def test_async_get_cluster_commands(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
) -> None:
"""Test async_get_cluster_commands method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zha_device = await device_joined(zigpy_dev)

assert zha_device.async_get_cluster_commands(3, general.OnOff.cluster_id) == {
CLUSTER_COMMANDS_CLIENT: zigpy_dev.endpoints[3].on_off.client_commands,
CLUSTER_COMMANDS_SERVER: zigpy_dev.endpoints[3].on_off.server_commands,
}


async def test_write_zigbee_attribute(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
) -> None:
"""Test write_zigbee_attribute method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zha_device = await device_joined(zigpy_dev)

with pytest.raises(
ValueError,
match="Cluster 8 not found on endpoint 3 while writing attribute 0 with value 20",
):
await zha_device.write_zigbee_attribute(
3,
general.LevelControl.cluster_id,
general.LevelControl.AttributeDefs.current_level.id,
20,
)

cluster = zigpy_dev.endpoints[3].on_off
cluster.write_attributes.reset_mock()

response: WriteAttributesResponse = await zha_device.write_zigbee_attribute(
3,
general.OnOff.cluster_id,
general.OnOff.AttributeDefs.start_up_on_off.id,
general.OnOff.StartUpOnOff.PreviousValue,
)

assert response is not None
assert len(response) == 1
status_record = response[0][0]
assert status_record.status == Status.SUCCESS

assert cluster.write_attributes.await_count == 1
assert cluster.write_attributes.await_args == call(
{
general.OnOff.AttributeDefs.start_up_on_off.id: general.OnOff.StartUpOnOff.PreviousValue
},
manufacturer=None,
)

cluster.write_attributes.reset_mock()
cluster.write_attributes.side_effect = ZigbeeException
m1 = "Failed to set attribute: value: <StartUpOnOff.PreviousValue: 255> "
m2 = "attribute: 16387 cluster_id: 6 endpoint_id: 3"
with pytest.raises(
ZHAException,
match=f"{m1}{m2}",
):
await zha_device.write_zigbee_attribute(
3,
general.OnOff.cluster_id,
general.OnOff.AttributeDefs.start_up_on_off.id,
general.OnOff.StartUpOnOff.PreviousValue,
)

cluster = zigpy_dev.endpoints[3].on_off
cluster.write_attributes.reset_mock()


async def test_issue_cluster_command(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
) -> None:
"""Test issue_cluster_command method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zha_device = await device_joined(zigpy_dev)

with pytest.raises(
ValueError,
match="Cluster 8 not found on endpoint 3 while issuing command 0 with args \\[20\\]",
):
await zha_device.issue_cluster_command(
3,
general.LevelControl.cluster_id,
general.LevelControl.ServerCommandDefs.move_to_level.id,
CLUSTER_COMMAND_SERVER,
[20],
None,
)

cluster = zigpy_dev.endpoints[3].on_off

with patch("zigpy.zcl.Cluster.request", return_value=[0x5, Status.SUCCESS]):
await zha_device.issue_cluster_command(
3,
general.OnOff.cluster_id,
general.OnOff.ServerCommandDefs.on.id,
CLUSTER_COMMAND_SERVER,
None,
{},
)

assert cluster.request.await_count == 1


async def test_async_add_to_group_remove_from_group(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test async_add_to_group and async_remove_from_group methods."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zigpy_dev.endpoints[3].add_input_cluster(general.Groups.cluster_id)
zha_device = await device_joined(zigpy_dev)

group: Group = zha_device.gateway.groups[0x1001]

assert (zha_device.ieee, 3) not in group.zigpy_group.members

await zha_device.async_add_to_group(group.group_id)

assert (zha_device.ieee, 3) in group.zigpy_group.members

await zha_device.async_remove_from_group(group.group_id)

assert (zha_device.ieee, 3) not in group.zigpy_group.members

await zha_device.async_add_endpoint_to_group(3, group.group_id)

assert (zha_device.ieee, 3) in group.zigpy_group.members

await zha_device.async_remove_endpoint_from_group(3, group.group_id)

assert (zha_device.ieee, 3) not in group.zigpy_group.members

with patch("zigpy.device.Device.add_to_group", side_effect=ZigbeeException):
await zha_device.async_add_to_group(group.group_id)
assert (zha_device.ieee, 3) not in group.zigpy_group.members
assert (
"Failed to add device '00:0d:6f:00:0a:90:69:e7' to group: 0x1001"
in caplog.text
)

with patch("zigpy.endpoint.Endpoint.add_to_group", side_effect=ZigbeeException):
await zha_device.async_add_endpoint_to_group(3, group.group_id)
assert (zha_device.ieee, 3) not in group.zigpy_group.members
assert (
"Failed to add endpoint: 3 for device: '00:0d:6f:00:0a:90:69:e7' to group: 0x1001"
in caplog.text
)

# add it
assert (zha_device.ieee, 3) not in group.zigpy_group.members
await zha_device.async_add_to_group(group.group_id)
assert (zha_device.ieee, 3) in group.zigpy_group.members

with patch("zigpy.device.Device.remove_from_group", side_effect=ZigbeeException):
await zha_device.async_remove_from_group(group.group_id)
assert (zha_device.ieee, 3) in group.zigpy_group.members
assert (
"Failed to remove device '00:0d:6f:00:0a:90:69:e7' from group: 0x1001"
in caplog.text
)

with patch(
"zigpy.endpoint.Endpoint.remove_from_group", side_effect=ZigbeeException
):
await zha_device.async_remove_endpoint_from_group(3, group.group_id)
assert (zha_device.ieee, 3) in group.zigpy_group.members
assert (
"Failed to remove endpoint: 3 for device '00:0d:6f:00:0a:90:69:e7' from group: 0x1001"
in caplog.text
)


async def test_async_bind_to_group(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test async_bind_to_group method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zigpy_dev.endpoints[3].add_input_cluster(general.Groups.cluster_id)
zha_device = await device_joined(zigpy_dev)

zigpy_dev_remote = zigpy_device(with_basic_cluster_handler=True)
zigpy_dev_remote._ieee = zigpy.types.EUI64.convert("00:0d:7f:00:0a:90:69:e8")
zigpy_dev_remote.endpoints[3].add_output_cluster(general.OnOff.cluster_id)
zha_device_remote = await device_joined(zigpy_dev_remote)
assert zha_device_remote is not None

group: Group = zha_device.gateway.groups[0x1001]

# add a device to the group for binding
assert (zha_device.ieee, 3) not in group.zigpy_group.members
await zha_device.async_add_to_group(group.group_id)
assert (zha_device.ieee, 3) in group.zigpy_group.members

await zha_device_remote.async_bind_to_group(
group.group_id,
[ClusterBinding(name="on_off", type=CLUSTER_TYPE_OUT, id=6, endpoint_id=3)],
)
assert (
"0xb79c: Bind_req 00:0d:7f:00:0a:90:69:e8, ep: 3, cluster: 6 to group: 0x1001 completed: [<Status.SUCCESS: 0>]"
in caplog.text
)

await zha_device_remote.async_unbind_from_group(
group.group_id,
[ClusterBinding(name="on_off", type=CLUSTER_TYPE_OUT, id=6, endpoint_id=3)],
)

m1 = "0xb79c: Unbind_req 00:0d:7f:00:0a:90:69:e8, ep: 3, cluster: 6"
assert f"{m1} to group: 0x1001 completed: [<Status.SUCCESS: 0>]" in caplog.text
20 changes: 12 additions & 8 deletions zha/zigbee/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
from zigpy.types.named import EUI64, NWK
from zigpy.zcl.clusters import Cluster
from zigpy.zcl.clusters.general import Groups, Identify
from zigpy.zcl.foundation import Status as ZclStatus, ZCLCommandDef
from zigpy.zcl.foundation import (
Status as ZclStatus,
WriteAttributesResponse,
ZCLCommandDef,
)
import zigpy.zdo.types as zdo_types

from zha.application import discovery
Expand Down Expand Up @@ -780,13 +784,13 @@ def async_get_cluster_commands(

async def write_zigbee_attribute(
self,
endpoint_id,
cluster_id,
attribute,
value,
cluster_type=CLUSTER_TYPE_IN,
manufacturer=None,
):
endpoint_id: int,
cluster_id: int,
attribute: int | str,
value: Any,
cluster_type: str = CLUSTER_TYPE_IN,
manufacturer: int | None = None,
) -> WriteAttributesResponse | None:
"""Write a value to a zigbee attribute for a cluster in this entity."""
try:
cluster: Cluster = self.async_get_cluster(
Expand Down
Loading

0 comments on commit 546033d

Please sign in to comment.