diff --git a/custom_components/localtuya/coordinator.py b/custom_components/localtuya/coordinator.py index 68584a95..cd33451c 100644 --- a/custom_components/localtuya/coordinator.py +++ b/custom_components/localtuya/coordinator.py @@ -256,6 +256,7 @@ def _new_entity_handler(entity_id): self.debug(f"Success: connected to {host}", force=True) if self._sub_devices: + self._interface.start_sub_devices_heartbeat() for subdevice in self._sub_devices.values(): self._hass.async_create_task(subdevice.async_connect()) diff --git a/custom_components/localtuya/core/pytuya/__init__.py b/custom_components/localtuya/core/pytuya/__init__.py index 841d45b9..eb7563c3 100644 --- a/custom_components/localtuya/core/pytuya/__init__.py +++ b/custom_components/localtuya/core/pytuya/__init__.py @@ -177,9 +177,11 @@ class DecodeError(Exception): SESS_KEY_NEG_START, SESS_KEY_NEG_RESP, SESS_KEY_NEG_FINISH, + LAN_EXT_STREAM, ] HEARTBEAT_INTERVAL = 10 +HEARTBEAT_SUB_DEVICES_INTERVAL = 30 # DPS that are known to be safe to use with update_dps (0x12) command UPDATE_DPS_WHITELIST = [18, 19, 20] # Socket (Wi-Fi) @@ -216,6 +218,7 @@ class DecodeError(Exception): CONTROL_NEW: {"command": {"devId": "", "uid": "", "t": "", "cid": ""}}, DP_QUERY_NEW: {"command": {"devId": "", "uid": "", "t": "", "cid": ""}}, UPDATEDPS: {"command": {"dpId": [18, 19, 20], "cid": ""}}, + LAN_EXT_STREAM: {"command": {"reqType": "", "data": {}}}, }, # Special Case Device "0d" - Some of these devices # Require the 0d command as the DP_QUERY status request and the list of @@ -592,7 +595,9 @@ def abort(self): async def wait_for(self, seqno, cmd, timeout=5): """Wait for response to a sequence number to be received and return it.""" if seqno in self.listeners: - raise Exception(f"listener exists for {seqno}") + self.debug(f"listener exists for {seqno}") + if seqno == self.HEARTBEAT_SEQNO: + raise Exception(f"listener exists for {seqno}") self.debug("Command %d waiting for seq. number %d", cmd, seqno) self.listeners[seqno] = asyncio.Semaphore(0) @@ -603,8 +608,9 @@ async def wait_for(self, seqno, cmd, timeout=5): "Command %d timed out waiting for sequence number %d", cmd, seqno ) del self.listeners[seqno] - raise - + raise TimeoutError( + f"Command {cmd} timed out waiting for sequence number {seqno}" + ) return self.listeners.pop(seqno) def add_data(self, data): @@ -653,11 +659,12 @@ def add_data(self, data): def _dispatch(self, msg): """Dispatch a message to someone that is listening.""" - self.debug("Dispatching message CMD %r %s", msg.cmd, msg) + self.debug("Dispatching message CMD %r %s", msg.cmd, msg, force=True) if msg.seqno in self.listeners: self.debug("Dispatching sequence number %d", msg.seqno) self._release_listener(msg.seqno, msg) + if msg.cmd == HEART_BEAT: self.debug("Got heartbeat response") self._release_listener(self.HEARTBEAT_SEQNO, msg) @@ -674,9 +681,12 @@ def _dispatch(self, msg): else: self.debug("Got status update") self.callback_status_update(msg) + elif msg.cmd == LAN_EXT_STREAM and msg.payload: + self.debug(f"Got Sub-devices status update") + self.callback_status_update(msg) else: - if msg.cmd == CONTROL_NEW: - self.debug("Got ACK message for command %d: will ignore it", msg.cmd) + if msg.cmd == CONTROL_NEW or not msg.payload: + self.debug("Got ACK message for command %d: will ignore it %s", msg.cmd) else: self.debug( "Got message type %d for unknown listener %d: %s", @@ -769,6 +779,7 @@ def __init__( self.on_connected = on_connected self.heartbeater = None self.dps_cache = {} + self.sub_devices_states = {} # {"Online": [cid,...], "offline": [cid...]} self.local_nonce = b"0123456789abcdef" # not-so-random random key self.remote_nonce = b"" self.dps_whitelist = UPDATE_DPS_WHITELIST @@ -809,6 +820,22 @@ def _status_update(msg): decoded_message: dict = self._decode_payload(msg.payload) cid = None + # Handle sub-devices states update. + if msg.cmd == LAN_EXT_STREAM: + self.debug(f"Sub-Devices States Update: {decoded_message}") + if (data := decoded_message.get("data")) and isinstance(data, dict): + self.sub_devices_states.update(data) + listener = self.listener and self.listener() + if listener is None: + return + + on_devices = data.get("online", []) + off_devices = data.get("offline", []) + for cid, device in listener._sub_devices.items(): + if cid in off_devices or cid not in on_devices: + device.disconnected() + return + if "dps" not in decoded_message: return @@ -883,9 +910,28 @@ async def heartbeat_loop(): # Prevent duplicates heartbeat task self.heartbeater = self.loop.create_task(heartbeat_loop()) + def start_sub_devices_heartbeat(self): + """Update the states of subdevices every 30sec. this function only be called once.""" + + async def heartbeat_loop(): + """Continuously send heart beat updates.""" + self.debug("Start a heartbeat for sub-devices") + # This will break if main "heartbeat" stopped + while self.heartbeater: + try: + await self.subdevices_query() + await asyncio.sleep(HEARTBEAT_SUB_DEVICES_INTERVAL) + except (Exception, asyncio.CancelledError) as ex: + self.debug(f"Sub-devices heartbeat stopped due to: {ex}") + break + + if self.heartbeater: + # Prevent duplicates heartbeat task + self.loop.create_task(heartbeat_loop()) + def data_received(self, data): """Received data from device.""" - # self.debug("received data=%r", binascii.hexlify(data)) + # self.debug("received data=%r", binascii.hexlify(data), force=True) self.dispatcher.add_data(data) def connection_lost(self, exc): @@ -960,7 +1006,7 @@ async def exchange_quick(self, payload, recv_retries): ) return None - async def exchange(self, command, dps=None, nodeID=None, delay=True): + async def exchange(self, command, dps=None, nodeID=None, delay=True, payload=None): """Send and receive a message, returning response from device.""" if self.version >= 3.4 and self.real_local_key == self.local_key: self.debug("3.4 or 3.5 device: negotiating a new session key") @@ -969,7 +1015,7 @@ async def exchange(self, command, dps=None, nodeID=None, delay=True): self.debug( "Sending command %s (device type: %s) DPS: %s", command, self.dev_type, dps ) - payload = self._generate_payload(command, dps, nodeId=nodeID) + payload = payload or self._generate_payload(command, dps, nodeId=nodeID) real_cmd = payload.cmd dev_type = self.dev_type # self.debug("Exchange: payload %r %r", payload.cmd, payload.payload) @@ -994,7 +1040,7 @@ async def exchange(self, command, dps=None, nodeID=None, delay=True): if real_cmd in [HEART_BEAT, CONTROL, CONTROL_NEW] and len(msg.payload) == 0: # device may send messages with empty payload in response # to a HEART_BEAT or CONTROL or CONTROL_NEW command: consider them an ACK - self.debug("ACK received for command %d: ignoring it", real_cmd) + self.debug(f"ACK received for command {real_cmd}: ignoring: {msg.seqno}") return None payload = self._decode_payload(msg.payload) @@ -1075,6 +1121,15 @@ async def set_dps(self, dps, cid=None): """Set values for a set of datapoints.""" return await self.exchange(CONTROL, dps, nodeID=cid) + async def subdevices_query(self): + """Request a list of sub-devices and their status.""" + # Return payload: {"online": [cid1, ...], "offline": [cid2, ...]} + payload = self._generate_payload( + LAN_EXT_STREAM, rawData={"cids": []}, reqType="subdev_online_stat_query" + ) + + return await self.exchange(command=LAN_EXT_STREAM, payload=payload) + async def detect_available_dps(self, cid=None): """Return which datapoints are supported by the device.""" # type_0d devices need a sort of bruteforce querying in order to detect the @@ -1182,12 +1237,14 @@ def _decode_payload(self, payload): try: json_payload = json.loads(payload) except Exception as ex: + json_payload = self.error_json(ERR_JSON, payload) + if "devid not" in payload: # DeviceID Not found. raise ValueError(f"DeviceID [{self.id}] Not found") - else: - raise DecodeError( - f"[{self.id}]: could not decrypt data: wrong local_key? (exception: {ex}, payload: {payload})" - ) + # else: + # raise DecodeError( + # f"[{self.id}]: could not decrypt data: wrong local_key? (exception: {ex}, payload: {payload})" + # ) # json_payload = self.error_json(ERR_JSON, payload) # v3.4 stuffs it into {"data":{"dps":{"1":true}}, ...} @@ -1341,7 +1398,15 @@ def _encode_message(self, msg): return buffer def _generate_payload( - self, command, data=None, gwId=None, devId=None, uid=None, nodeId=None + self, + command, + data=None, + gwId=None, + devId=None, + uid=None, + nodeId=None, + rawData=None, + reqType=None, ): """ Generate the payload to send. @@ -1429,8 +1494,9 @@ def deepcopy_dict(_dict: dict): json_data["t"] = int(time.time()) else: json_data["t"] = str(int(time.time())) - - if data is not None: + if rawData is not None and "data" in json_data: + json_data["data"] = rawData + elif data is not None: if "dpId" in json_data: json_data["dpId"] = data elif "data" in json_data: @@ -1439,6 +1505,8 @@ def deepcopy_dict(_dict: dict): json_data["dps"] = data elif self.dev_type == "type_0d" and command == DP_QUERY: json_data["dps"] = self.dps_to_request + if reqType and "reqType" in json_data: + json_data["reqType"] = reqType if json_data == "": payload = ""