Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,7 @@ jobs:
run: isort --check scripts custom_components matter_server/client matter_server/common matter_server/server
- name: test show stored node script
run: python3 -m scripts.show_stored_node tests/fixtures/nodes/lighting-example-app.json
- name: test dump fixer script
run: python3 -m scripts.dump_fixer tests/fixtures/nodes/lighting-example-app.json
- name: Pytest
run: pytest tests
69 changes: 69 additions & 0 deletions scripts/dump_fixer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Help patch bad dumps.

When a dump doesn't match the expected TLV data from the Matter SDK,
we don't dump it as a correct Matter class, but a collection of TLVValues.

This script will look up that data and try to re-construct the data.
"""
import json
from pprint import pprint

from matter_server.common import json_utils
from matter_server.vendor.chip.clusters import Objects as all_clusters

from .show_stored_node import PrintButFirst, get_nodes


def process_node(node):
print(f"Node {node.node_id}")
for endpoint_id, endpoint_data in node.raw_data["attributes"].items():
for cluster_name, cluster_info in endpoint_data.items():
# It's a dict if it couldn't parse it.
if not isinstance(cluster_info, dict):
continue

print(
f"** Found unprocessed cluster at endpoint {endpoint_id}: {cluster_name}"
)
print(f"Reason: {cluster_info['Reason']}")
print()
cluster_cls = getattr(all_clusters, cluster_name)
field_lookup = {
str(desc.Tag): desc.Label for desc in cluster_cls.descriptor.Fields
}

fixed = {}
bad = {}

for field_id, field_data in cluster_info["TLVValue"].items():
if field_id not in field_lookup:
bad[field_id] = field_data
continue

if isinstance(field_data, dict) and "TLVValue" in field_data:
bad[field_lookup[field_id]] = field_data
else:
fixed[field_lookup[field_id]] = field_data

fixed["_type"] = f"chip.clusters.Objects.{cluster_name}"

print(json.dumps(fixed, cls=json_utils.CHIPJSONEncoder, indent=2))

if bad:
print()
print("Bad fields:")
pprint(bad)

print()


def main():
item_space_printer = PrintButFirst(2)

for node in get_nodes():
item_space_printer()
process_node(node)


if __name__ == "__main__":
main()
61 changes: 39 additions & 22 deletions scripts/show_stored_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,31 @@

import dataclasses
import json
import logging
import os
import pathlib
import sys
from unittest.mock import Mock

from custom_components.matter_experimental.device_platform import DEVICE_PLATFORM
from matter_server.client.model.device import MatterDevice
from matter_server.client.model.node import MatterNode
from matter_server.common import json_utils

from tests.test_utils.mock_matter import get_mock_matter


class PrintButFirst:
first = True

def __init__(self, lines=1) -> None:
self.lines = lines

def __call__(self):
if self.first:
self.first = False
else:
for _ in range(self.lines):
print()


def resolve_input():
if len(sys.argv) < 2:
Expand All @@ -26,12 +40,10 @@ def resolve_input():

def print_node(node: MatterNode):
print(node)
first = True
item_space_printer = PrintButFirst()

for device in node.devices:
if first:
first = False
else:
print()
item_space_printer()
print_device(device)


Expand Down Expand Up @@ -89,28 +101,33 @@ def print_device(device: MatterDevice):
print(" ** WARNING: NOT MAPPED IN HOME ASSISTANT")


def main():
raw_data = resolve_input()
data = json.loads(raw_data, cls=json_utils.CHIPJSONDecoder)
def parse_data(data):
return json.loads(data, cls=json_utils.CHIPJSONDecoder)


def nodes_from_data(data):
# This is a HA storage file. Extract nodes
if "key" in data and data["key"].startswith("matter_experimental_"):
nodes = [d for d in data["data"]["nodes"].values() if d is not None]
else:
nodes = [data]
return [d for d in data["data"]["nodes"].values() if d is not None]

first = True
return [data]

mock_matter = Mock(adapter=Mock(logger=logging.getLogger("show_mappings")))

for node_data in nodes:
if first:
first = False
else:
print()
print()
def get_nodes():
mock_matter = get_mock_matter()

return [
MatterNode(mock_matter, node_data)
for node_data in nodes_from_data(parse_data(resolve_input()))
]


def main():
item_space_printer = PrintButFirst(2)

print_node(MatterNode(mock_matter, node_data))
for node in get_nodes():
item_space_printer()
print_node(node)


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions tests/test_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Testing utilities."""
7 changes: 7 additions & 0 deletions tests/test_utils/mock_matter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Mock Matter."""
import logging
from unittest.mock import Mock


def get_mock_matter():
return Mock(adapter=Mock(logger=logging.getLogger("mock_matter")))