Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quirks V2 #1329

Merged
merged 35 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
afe5451
example of quirks v2
dmulcahey Jan 20, 2024
84c2b96
add matcher example
dmulcahey Jan 21, 2024
76d06d5
comparison test
dmulcahey Jan 21, 2024
3b75c5c
small cleanup
dmulcahey Jan 21, 2024
56082e1
add support for custom device classes
dmulcahey Jan 27, 2024
7b6c221
tweak
dmulcahey Jan 27, 2024
3c338bf
add configuration hooks and type metadata options for clusters
dmulcahey Jan 27, 2024
a3763b2
make customdevicev2 extend customdevice
dmulcahey Jan 30, 2024
de3aefe
black
dmulcahey Jan 30, 2024
43e90c0
clean up metadata
dmulcahey Feb 7, 2024
d4205e9
make less verbose
dmulcahey Feb 7, 2024
97a47a2
move quirks v2 to their own module
dmulcahey Feb 7, 2024
5aa23c8
pylint ignores
dmulcahey Feb 7, 2024
58f3058
fix pylint disables after formatting
dmulcahey Feb 9, 2024
946e29a
support matching a single v2 quirk to multiple devices
dmulcahey Feb 9, 2024
bd33eb3
copy cache for clusters when quirking an endpoint
dmulcahey Feb 9, 2024
7b26a4c
add zcl command button metadata
dmulcahey Feb 9, 2024
343dabb
only copy attr cache for CustomDeviceV2
dmulcahey Feb 9, 2024
e190ca7
filter zdo during custom configuration
dmulcahey Feb 10, 2024
f541734
sensor metadata
dmulcahey Feb 10, 2024
519a17c
remove patches until I can figure it out
dmulcahey Feb 10, 2024
0c34f4d
fix cluster attr cache copy and device triggers
dmulcahey Feb 10, 2024
b79f27a
coverage
dmulcahey Feb 10, 2024
8ef2507
remove unused stuff
dmulcahey Feb 10, 2024
0c3cfa8
clean up pylint hints
dmulcahey Feb 11, 2024
bccd751
add command button
dmulcahey Feb 11, 2024
3a0137b
format
dmulcahey Feb 12, 2024
246c67a
add support for overridding node descriptor and skipping configuration
dmulcahey Feb 12, 2024
1456513
support HA device class, state class and units
dmulcahey Feb 12, 2024
24b7d71
add support for different off and on values for switch
dmulcahey Feb 13, 2024
6d83461
initially_disabled and attribute_initialized_from_cache
dmulcahey Feb 20, 2024
8f5e9e1
add translation_key
dmulcahey Feb 21, 2024
6f4c898
pylint directive cleanup
dmulcahey Feb 21, 2024
78f9a5b
add unit to sensor
dmulcahey Feb 22, 2024
ea0b67c
remove decimals from sensor
dmulcahey Feb 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
848 changes: 848 additions & 0 deletions tests/test_quirks_v2.py

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions zigpy/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,11 @@ def __init__(

class CorruptDatabase(ZigbeeException):
"""The SQLite database is corrupt or otherwise inconsistent"""


class QuirksException(Exception):
"""Base exception class"""


class MultipleQuirksMatchException(QuirksException):
"""Thrown when multiple v2 quirks match a device"""
123 changes: 120 additions & 3 deletions zigpy/quirks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Zigpy quirks module."""
from __future__ import annotations

import logging
Expand All @@ -17,7 +18,7 @@
)
import zigpy.device
import zigpy.endpoint
from zigpy.quirks.registry import DeviceRegistry # noqa: F401
from zigpy.quirks.registry import DeviceRegistry
import zigpy.types as t
from zigpy.types.basic import uint16_t
import zigpy.zcl
Expand Down Expand Up @@ -63,6 +64,10 @@ def register_uninitialized_device_message_handler(handler: typing.Callable) -> N


class CustomDevice(zigpy.device.Device):
"""Implementation of a quirks v1 custom device."""

_copy_cluster_attr_cache = False

replacement: dict[str, typing.Any] = {}
signature = None

Expand Down Expand Up @@ -117,6 +122,8 @@ def add_endpoint(


class CustomEndpoint(zigpy.endpoint.Endpoint):
"""Custom endpoint implementation for quirks."""

def __init__(
self,
device: CustomDevice,
Expand All @@ -143,7 +150,17 @@ def set_device_attr(attr):
else:
cluster = c(self, is_server=True)
cluster_id = cluster.cluster_id
self.add_input_cluster(cluster_id, cluster)
cluster = self.add_input_cluster(cluster_id, cluster)
if self.device._copy_cluster_attr_cache:
if (
endpoint_id in replace_device.endpoints
and cluster_id in replace_device.endpoints[endpoint_id].in_clusters
):
cluster._attr_cache = (
replace_device[endpoint_id]
.in_clusters[cluster_id]
._attr_cache.copy()
)

for c in replacement_data.get(SIG_EP_OUTPUT, []):
if isinstance(c, int):
Expand All @@ -152,10 +169,22 @@ def set_device_attr(attr):
else:
cluster = c(self, is_server=False)
cluster_id = cluster.cluster_id
self.add_output_cluster(cluster_id, cluster)
cluster = self.add_output_cluster(cluster_id, cluster)
if self.device._copy_cluster_attr_cache:
if (
endpoint_id in replace_device.endpoints
and cluster_id in replace_device.endpoints[endpoint_id].out_clusters
):
cluster._attr_cache = (
replace_device[endpoint_id]
.out_clusters[cluster_id]
._attr_cache.copy()
)


class CustomCluster(zigpy.zcl.Cluster):
"""Custom cluster implementation for quirks."""

_skip_registry = True
_CONSTANT_ATTRIBUTES: dict[int, typing.Any] | None = None

Expand Down Expand Up @@ -356,6 +385,94 @@ def get(self, key: int | str, default: typing.Any | None = None) -> typing.Any:

return super().get(key, default)

async def apply_custom_configuration(self, *args, **kwargs):
"""Hook for applications to instruct instances to apply custom configuration."""


FilterType = typing.Callable[
[zigpy.device.Device],
bool,
]


def signature_matches(
signature: dict[str, typing.Any],
) -> FilterType:
"""Return True if device matches signature."""

def _match(a: dict | typing.Iterable, b: dict | typing.Iterable) -> bool:
return set(a) == set(b)

def _filter(device: zigpy.device.Device) -> bool:
"""Return True if device matches signature."""
if device.model != signature.get(SIG_MODEL, device.model):
_LOGGER.debug("Fail, because device model mismatch: '%s'", device.model)
return False

if device.manufacturer != signature.get(SIG_MANUFACTURER, device.manufacturer):
_LOGGER.debug(
"Fail, because device manufacturer mismatch: '%s'",
device.manufacturer,
)
return False

dev_ep = set(device.endpoints) - {0}

sig = signature.get(SIG_ENDPOINTS)
if sig is None:
return False

if not _match(sig, dev_ep):
_LOGGER.debug(
"Fail because endpoint list mismatch: %s %s",
set(sig.keys()),
dev_ep,
)
return False

if not all(
device[eid].profile_id
== sig[eid].get(SIG_EP_PROFILE, device[eid].profile_id)
for eid in sig
):
_LOGGER.debug("Fail because profile_id mismatch on at least one endpoint")
return False

if not all(
device[eid].device_type
== sig[eid].get(SIG_EP_TYPE, device[eid].device_type)
for eid in sig
):
_LOGGER.debug("Fail because device_type mismatch on at least one endpoint")
return False

if not all(
_match(device[eid].in_clusters, ep.get(SIG_EP_INPUT, []))
for eid, ep in sig.items()
):
_LOGGER.debug(
"Fail because input cluster mismatch on at least one endpoint"
)
return False

if not all(
_match(device[eid].out_clusters, ep.get(SIG_EP_OUTPUT, []))
for eid, ep in sig.items()
):
_LOGGER.debug(
"Fail because output cluster mismatch on at least one endpoint"
)
return False

_LOGGER.debug(
"Device matches filter signature - device ieee[%s]: filter signature[%s]",
device.ieee,
signature,
)
return True

return _filter


def handle_message_from_uninitialized_sender(
sender: zigpy.device.Device,
Expand Down
123 changes: 47 additions & 76 deletions zigpy/quirks/registry.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
"""Zigpy quirks registry."""
from __future__ import annotations

import collections
import itertools
import logging
import typing
from typing import TYPE_CHECKING

from zigpy.const import (
SIG_ENDPOINTS,
SIG_EP_INPUT,
SIG_EP_OUTPUT,
SIG_EP_PROFILE,
SIG_EP_TYPE,
SIG_MANUFACTURER,
SIG_MODEL,
SIG_MODELS_INFO,
)
from zigpy.const import SIG_MANUFACTURER, SIG_MODEL, SIG_MODELS_INFO
from zigpy.exceptions import MultipleQuirksMatchException
import zigpy.quirks
from zigpy.typing import CustomDeviceType, DeviceType

if TYPE_CHECKING:
from zigpy.quirks.v2 import QuirksV2RegistryEntry

_LOGGER = logging.getLogger(__name__)

TYPE_MANUF_QUIRKS_DICT = typing.Dict[
Expand All @@ -27,10 +24,16 @@


class DeviceRegistry:
"""Device registry for Zigpy quirks."""

def __init__(self, *args, **kwargs) -> None:
"""Initialize the registry."""
self._registry: TYPE_MANUF_QUIRKS_DICT = collections.defaultdict(
lambda: collections.defaultdict(list)
)
self._registry_v2: dict[
tuple[str, str], list[QuirksV2RegistryEntry]
] = collections.defaultdict(list)

def add_to_registry(self, custom_device: CustomDeviceType) -> None:
"""Add a device to the registry"""
Expand All @@ -45,7 +48,18 @@ def add_to_registry(self, custom_device: CustomDeviceType) -> None:
if custom_device not in self.registry[manufacturer][model]:
self.registry[manufacturer][model].insert(0, custom_device)

def add_to_registry_v2(
self, manufacturer: str, model: str, entry: QuirksV2RegistryEntry
):
"""Add an entry to the registry."""
key = (manufacturer, model)
if not entry.registry:
entry.registry = self
self._registry_v2[key].append(entry)
return entry

def remove(self, custom_device: CustomDeviceType) -> None:
"""Remove a device from the registry"""
models_info = custom_device.signature.get(SIG_MODELS_INFO)
if models_info:
for manuf, model in models_info:
Expand All @@ -59,7 +73,26 @@ def get_device(self, device: DeviceType) -> CustomDeviceType | DeviceType:
"""Get a CustomDevice object, if one is available"""
if isinstance(device, zigpy.quirks.CustomDevice):
return device
dev_ep = set(device.endpoints) - {0}

key = (device.manufacturer, device.model)
if key in self._registry_v2:
matches: list[QuirksV2RegistryEntry] = []
entries = self._registry_v2[key]
if len(entries) == 1:
if entries[0].matches_device(device):
matches.append(entries[0])
else:
for entry in entries:
if entry.matches_device(device):
matches.append(entry)
if len(matches) > 1:
raise MultipleQuirksMatchException(
f"Multiple matches found for device {device}: {matches}"
)
if len(matches) == 1:
quirk_entry: QuirksV2RegistryEntry = matches[0]
return quirk_entry.create_device(device)

_LOGGER.debug(
"Checking quirks for %s %s (%s)",
device.manufacturer,
Expand All @@ -72,69 +105,10 @@ def get_device(self, device: DeviceType) -> CustomDeviceType | DeviceType:
self.registry[None][device.model],
self.registry[None][None],
):
matcher = zigpy.quirks.signature_matches(candidate.signature)
_LOGGER.debug("Considering %s", candidate)

if device.model != candidate.signature.get(SIG_MODEL, device.model):
_LOGGER.debug("Fail, because device model mismatch: '%s'", device.model)
continue

if device.manufacturer != candidate.signature.get(
SIG_MANUFACTURER, device.manufacturer
):
_LOGGER.debug(
"Fail, because device manufacturer mismatch: '%s'",
device.manufacturer,
)
continue

sig = candidate.signature.get(SIG_ENDPOINTS)
if sig is None:
continue

if not self._match(sig, dev_ep):
_LOGGER.debug(
"Fail because endpoint list mismatch: %s %s",
set(sig.keys()),
dev_ep,
)
continue

if not all(
device[eid].profile_id
== sig[eid].get(SIG_EP_PROFILE, device[eid].profile_id)
for eid in sig
):
_LOGGER.debug(
"Fail because profile_id mismatch on at least one endpoint"
)
continue

if not all(
device[eid].device_type
== sig[eid].get(SIG_EP_TYPE, device[eid].device_type)
for eid in sig
):
_LOGGER.debug(
"Fail because device_type mismatch on at least one endpoint"
)
continue

if not all(
self._match(device[eid].in_clusters, ep.get(SIG_EP_INPUT, []))
for eid, ep in sig.items()
):
_LOGGER.debug(
"Fail because input cluster mismatch on at least one endpoint"
)
continue

if not all(
self._match(device[eid].out_clusters, ep.get(SIG_EP_OUTPUT, []))
for eid, ep in sig.items()
):
_LOGGER.debug(
"Fail because output cluster mismatch on at least one endpoint"
)
if not matcher(device):
continue

_LOGGER.debug(
Expand All @@ -145,12 +119,9 @@ def get_device(self, device: DeviceType) -> CustomDeviceType | DeviceType:

return device

@staticmethod
def _match(a: dict | typing.Iterable, b: dict | typing.Iterable) -> bool:
return set(a) == set(b)

@property
def registry(self) -> TYPE_MANUF_QUIRKS_DICT:
"""Return the registry."""
return self._registry

def __contains__(self, device: CustomDeviceType) -> bool:
Expand Down