Skip to content

Commit

Permalink
Add support for ApiV1 XML payload (issue #681)
Browse files Browse the repository at this point in the history
  • Loading branch information
ollo69 committed Feb 11, 2024
1 parent cc5b02d commit a2d16d5
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 47 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ config/*
!config/configuration.yaml

# WideQ test file
custom_components/smartthinq_sensors/wideq/deviceV1.txt
custom_components/smartthinq_sensors/wideq/deviceV2.txt
32 changes: 27 additions & 5 deletions custom_components/smartthinq_sensors/wideq/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import json
import logging
from numbers import Number
import os
from typing import Any

import aiohttp
Expand Down Expand Up @@ -706,6 +707,26 @@ async def _additional_poll(self, poll_interval: int):
except Exception as exc: # pylint: disable=broad-except
_LOGGER.debug("Error calling additional poll V2 methods: %s", exc)

def _load_emul_v1_payload(self):
"""
This is used only for debug.
Load the payload for V1 device from file "deviceV1.txt".
"""
if not self._client.emulation:
return None

data_file = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "deviceV1.txt"
)
try:
with open(data_file, "r", encoding="utf-8") as emu_payload:
device_v1 = json.load(emu_payload)
except (FileNotFoundError, json.JSONDecodeError):
return None
if ret_val := device_v1.get(self.device_info.device_id):
return str(ret_val).encode()
return None

async def _device_poll(
self,
snapshot_key="",
Expand Down Expand Up @@ -745,7 +766,8 @@ async def _device_poll(
return self._model_info.decode_snapshot(snapshot, snapshot_key)

# ThinQ V1 - Monitor data must be polled """
data = await self._mon.refresh()
if not (data := self._load_emul_v1_payload()):
data = await self._mon.refresh()
if not data:
return None

Expand Down Expand Up @@ -1026,14 +1048,14 @@ def lookup_reference(self, key, ref_key="_comment"):
curr_key, self._data[curr_key], ref_key
)

def lookup_bit_enum(self, key):
def lookup_bit_enum(self, key, *, sub_key=None):
"""Lookup value for a specific key of type bit enum."""
if not self._data:
str_val = ""
else:
str_val = self._data.get(key)
if not str_val:
str_val = self._device.model_info.bit_value(key, self._data)
str_val = self._device.model_info.bit_value(key, self._data, sub_key)

if str_val is None:
return None
Expand All @@ -1051,9 +1073,9 @@ def lookup_bit_enum(self, key):

return ret_val

def lookup_bit(self, key, invert=False):
def lookup_bit(self, key, *, sub_key=None, invert=False):
"""Lookup bit value for a specific key of type enum."""
enum_val = self.lookup_bit_enum(key)
enum_val = self.lookup_bit_enum(key, sub_key=sub_key)
if enum_val is None:
return None
bit_val = LOCAL_LANG_PACK.get(enum_val)
Expand Down
46 changes: 32 additions & 14 deletions custom_components/smartthinq_sensors/wideq/devices/washerDryer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@

POWER_STATUS_KEY = ["State", "state"]

CMD_POWER_OFF = [["Control", "WMControl"], ["Power", "WMOff"], ["Off", None]]
CMD_WAKE_UP = [["Control", "WMWakeup"], ["Operation", "WMWakeup"], ["WakeUp", None]]
CMD_PAUSE = [["Control", "WMControl"], ["Operation", "WMStop"], ["Stop", None]]
CMD_POWER_OFF = [[None, "WMControl"], ["PowerOff", "WMOff"], [None, None]]
CMD_WAKE_UP = [[None, "WMWakeup"], ["OperationWakeUp", "WMWakeup"], [None, None]]
CMD_PAUSE = [[None, "WMControl"], ["OperationStop", "WMStop"], [None, None]]
CMD_REMOTE_START = [
["Control", "WMStart"],
[None, "WMStart"],
["OperationStart", "WMStart"],
["Start", "WMStart"],
]
Expand Down Expand Up @@ -119,6 +119,11 @@ def _state_pause(self):
"""Return native value for pause state."""
return self._get_runstate_key(STATE_WM_PAUSE)

@property
def sub_key(self) -> str | None:
"""Return device sub key."""
return self._sub_key

def getkey(self, key: str | None) -> str | None:
"""Add subkey prefix to a key if required."""
if not (key and self._sub_key):
Expand Down Expand Up @@ -161,6 +166,9 @@ def _update_course_info(self, data, course_id=None):
Save information in the data payload for a specific course
or default course if not already available.
"""
if not data:
return {}

if self.model_info.is_info_v2:
n_course_key = self.model_info.config_value(self.getkey("courseType"))
s_course_key = self.model_info.config_value(self.getkey("smartCourseType"))
Expand All @@ -176,8 +184,9 @@ def _update_course_info(self, data, course_id=None):

# Prepare the course data initializing option for infoV1 device
ret_data = data.copy()
option_keys = self.model_info.option_keys(self._sub_key)
if not self.model_info.is_info_v2:
for opt_name in ["Option1", "Option2"]:
for opt_name in option_keys:
ret_data[opt_name] = data.get(opt_name, "0")

# Search valid course Info
Expand Down Expand Up @@ -205,7 +214,7 @@ def _update_course_info(self, data, course_id=None):
key = func_key.get("value")
data = func_key.get("default")
opt_set = False
for opt_name in ["Option1", "Option2"]:
for opt_name in option_keys:
if opt_name not in ret_data:
continue
opt_val = ret_data[opt_name]
Expand All @@ -224,27 +233,34 @@ def _update_course_info(self, data, course_id=None):

def _prepare_command_v1(self, cmd, key):
"""Prepare command for specific ThinQ1 device."""
encode = cmd.pop("encode", False)

str_data = ""
if "data" in cmd:
str_data = cmd["data"]
option_keys = self.model_info.option_keys(self._sub_key)
status_data = self._update_course_info(self._remote_start_status)

for dt_key, dt_value in status_data.items():
repl_key = f"{{{{{dt_key}}}}}"
if repl_key not in str_data:
continue
# for start command we set initial bit to 1
if key and key == "Start" and dt_key.startswith("Option"):
if key and key == "Start" and dt_key in option_keys:
bit_val = 1 if self._initial_bit_start else 0
new_value = self._update_opt_bit(
dt_key, dt_value, "InitialBit", bit_val
)
if new_value is not None:
dt_value = new_value
str_data = str_data.replace(f"{{{{{dt_key}}}}}", dt_value)
str_data = str_data.replace(repl_key, dt_value)
_LOGGER.debug("Command data content: %s", str_data)
encode = cmd.pop("encode", False)
if encode:
cmd["format"] = "B64"
str_list = json.loads(str_data)
str_data = base64.b64encode(bytes(str_list)).decode("ascii")
cmd["data"] = str_data
return cmd

return {**cmd, "data": str_data}

def _prepare_command_v2(self, cmd, key: str):
"""Prepare command for specific ThinQ2 device."""
Expand Down Expand Up @@ -437,7 +453,7 @@ async def set(
await super().set(
self._getcmdkey(ctrl_key),
self._getcmdkey(command),
key=self._getcmdkey(key),
key=key,
value=value,
data=data,
ctrl_path=ctrl_path,
Expand Down Expand Up @@ -827,7 +843,7 @@ def standby_state(self):
if key := self.get_model_info_key(keys):
status = self.lookup_enum(key)
if not status and not self.is_info_v2:
status = self.lookup_bit(keys[0])
status = self.lookup_bit(keys[0], sub_key=self._device.sub_key)
if not (status or key):
return None
if not status:
Expand All @@ -839,7 +855,9 @@ def _update_bit_features(self):
index = 1 if self.is_info_v2 else 0
for feature, keys in BIT_FEATURES.items():
invert = feature in INVERTED_BITS
status = self.lookup_bit(self._getkeys(keys[index]), invert)
status = self.lookup_bit(
self._getkeys(keys[index]), sub_key=self._device.sub_key, invert=invert
)
self._update_feature(feature, status, False)

def _update_features(self):
Expand Down
116 changes: 88 additions & 28 deletions custom_components/smartthinq_sensors/wideq/model_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import logging
from numbers import Number

import xmltodict

from .const import BIT_OFF, BIT_ON

TYPE_BIT = "bit"
Expand Down Expand Up @@ -156,6 +158,10 @@ def reference_name(self, key, value, ref_key="_comment") -> str | None:
return ref_value.get("name")
return None

def option_keys(self, subkey: str | None = None) -> list:
"""Return a list of available option keys."""
return []

def bit_name(self, key, bit_index) -> str | None:
"""Look up the friendly name for an encoded bit based on the bit index."""
return None
Expand All @@ -164,7 +170,7 @@ def bit_index(self, key, bit_name) -> str | None:
"""Look up the start index for an encoded bit based on friendly name."""
return None

def bit_value(self, key, values) -> str | None:
def bit_value(self, key, values, sub_key=None) -> str | None:
"""
Look up the bit value for a specific key.
Not used in model V2.
Expand Down Expand Up @@ -286,6 +292,21 @@ def default(self, name):
"""Get the default value, if it exists, for a given value."""
return self._data.get("Value", {}).get(name, {}).get("default")

def option_keys(self, subkey: str | None = None) -> list:
"""Return a list of available option keys."""
if not (data := self._data.get("Value")):
return []

opt_key = "Option"
if subkey:
opt_key = subkey + opt_key
ret_keys = []
for i in range(1, 4):
key_id = f"{opt_key}{str(i)}"
if key_id in data:
ret_keys.append(key_id)
return ret_keys

def bit_name(self, key, bit_index) -> str | None:
"""Look up the friendly name for an encoded bit based on the bit index."""
if not (values := self.value(key, [TYPE_BIT])):
Expand All @@ -308,41 +329,39 @@ def bit_index(self, key, bit_name) -> str | None:

return None

def _get_bit_key(self, key):
def _get_bit_key(self, key: str, sub_key: str | None = None):
"""Get bit values for a specific key."""

def search_bit_key():
def search_bit_key(option_keys: list, data: dict | None):
if not data:
return {}
for i in range(1, 4):
opt_key = f"Option{str(i)}"
option = data.get(opt_key)
if not option:
for opt_key in option_keys:
if not (option := data.get(opt_key)):
continue
for opt in option.get("option", []):
if key == opt.get("value", ""):
start_bit = opt.get("startbit")
length = opt.get("length", 1)
if start_bit is None:
return {}
return {
"option": opt_key,
"startbit": start_bit,
"length": length,
}
if key != opt.get("value", ""):
continue
if (start_bit := opt.get("startbit")) is None:
return {}
return {
"option": opt_key,
"startbit": start_bit,
"length": opt.get("length", 1),
}

return {}

bit_key = self._bit_keys.get(key)
if bit_key is None:
data = self._data.get("Value")
bit_key = search_bit_key()
self._bit_keys[key] = bit_key
key_bit = sub_key + key if sub_key else key
if (bit_key := self._bit_keys.get(key_bit)) is None:
option_keys = self.option_keys(sub_key)
bit_key = search_bit_key(option_keys, self._data.get("Value"))
self._bit_keys[key_bit] = bit_key

return bit_key

def bit_value(self, key, values) -> str | None:
def bit_value(self, key, values, sub_key=None) -> str | None:
"""Look up the bit value for an specific key."""
bit_key = self._get_bit_key(key)
bit_key = self._get_bit_key(key, sub_key)
if not bit_key:
return None
value = None if not values else values.get(bit_key["option"])
Expand Down Expand Up @@ -429,11 +448,52 @@ def decode_monitor_hex(self, data):
decoded[key] = str(value)
return decoded

@staticmethod
def decode_monitor_xml(data):
def decode_monitor_xml(self, data):
"""Decode a xml that encodes status data."""
_LOGGER.warning("Received XML data from device: %s", data)
return None

try:
xml_json = xmltodict.parse(data.decode("utf8"))
except Exception as ex: # pylint: disable=broad-except
_LOGGER.warning("Failed to decode XML message: [%s] - error: %s", data, ex)
return None

main_tag: str | None = self._data["Monitoring"].get("tag")
if not main_tag or main_tag not in xml_json:
_LOGGER.warning(
"Invalid root tag [%s] for XML message: [%s]", main_tag, xml_json
)
return None

decoded = {}
dev_vals = xml_json[main_tag]
for item in self._data["Monitoring"]["protocol"]:
tags: str = item["tag"]
tag_list = tags.split(".")
tag_key = tag_list[0]
if len(tag_list) > 1:
value_dict: dict = dev_vals[tag_key]
tag_key = tag_list[1]
else:
value_dict: dict = dev_vals

if val := value_dict.get(tag_key):
key = item["value"]
if isinstance(key, list):
if isinstance(val, str):
sub_val = val.split(",")
else:
sub_val = []
for sub_idx, sub_key in enumerate(key):
if not isinstance(sub_key, str):
continue
decoded[sub_key] = (
sub_val[sub_idx] if len(sub_val) > sub_idx else ""
)

elif isinstance(key, str):
decoded[key] = val

return decoded

@staticmethod
def decode_monitor_json(data, mon_type):
Expand Down

0 comments on commit a2d16d5

Please sign in to comment.