Skip to content

Commit

Permalink
Add heartbeat for sub_devices #194
Browse files Browse the repository at this point in the history
  • Loading branch information
xZetsubou committed May 3, 2024
1 parent 4b0282a commit 5d8dced
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 17 deletions.
1 change: 1 addition & 0 deletions custom_components/localtuya/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def _new_entity_handler(entity_id):
self.debug(f"Success: connected to {host}", force=True)

if self._sub_devices:
self._interface.start_sub_devices_heartbeat()
for subdevice in self._sub_devices.values():
self._hass.async_create_task(subdevice.async_connect())

Expand Down
102 changes: 85 additions & 17 deletions custom_components/localtuya/core/pytuya/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,11 @@ class DecodeError(Exception):
SESS_KEY_NEG_START,
SESS_KEY_NEG_RESP,
SESS_KEY_NEG_FINISH,
LAN_EXT_STREAM,
]

HEARTBEAT_INTERVAL = 10
HEARTBEAT_SUB_DEVICES_INTERVAL = 30

# DPS that are known to be safe to use with update_dps (0x12) command
UPDATE_DPS_WHITELIST = [18, 19, 20] # Socket (Wi-Fi)
Expand Down Expand Up @@ -216,6 +218,7 @@ class DecodeError(Exception):
CONTROL_NEW: {"command": {"devId": "", "uid": "", "t": "", "cid": ""}},
DP_QUERY_NEW: {"command": {"devId": "", "uid": "", "t": "", "cid": ""}},
UPDATEDPS: {"command": {"dpId": [18, 19, 20], "cid": ""}},
LAN_EXT_STREAM: {"command": {"reqType": "", "data": {}}},
},
# Special Case Device "0d" - Some of these devices
# Require the 0d command as the DP_QUERY status request and the list of
Expand Down Expand Up @@ -592,7 +595,9 @@ def abort(self):
async def wait_for(self, seqno, cmd, timeout=5):
"""Wait for response to a sequence number to be received and return it."""
if seqno in self.listeners:
raise Exception(f"listener exists for {seqno}")
self.debug(f"listener exists for {seqno}")
if seqno == self.HEARTBEAT_SEQNO:
raise Exception(f"listener exists for {seqno}")

self.debug("Command %d waiting for seq. number %d", cmd, seqno)
self.listeners[seqno] = asyncio.Semaphore(0)
Expand All @@ -603,8 +608,9 @@ async def wait_for(self, seqno, cmd, timeout=5):
"Command %d timed out waiting for sequence number %d", cmd, seqno
)
del self.listeners[seqno]
raise

raise TimeoutError(
f"Command {cmd} timed out waiting for sequence number {seqno}"
)
return self.listeners.pop(seqno)

def add_data(self, data):
Expand Down Expand Up @@ -653,11 +659,12 @@ def add_data(self, data):
def _dispatch(self, msg):
"""Dispatch a message to someone that is listening."""

self.debug("Dispatching message CMD %r %s", msg.cmd, msg)
self.debug("Dispatching message CMD %r %s", msg.cmd, msg, force=True)

if msg.seqno in self.listeners:
self.debug("Dispatching sequence number %d", msg.seqno)
self._release_listener(msg.seqno, msg)

if msg.cmd == HEART_BEAT:
self.debug("Got heartbeat response")
self._release_listener(self.HEARTBEAT_SEQNO, msg)
Expand All @@ -674,9 +681,12 @@ def _dispatch(self, msg):
else:
self.debug("Got status update")
self.callback_status_update(msg)
elif msg.cmd == LAN_EXT_STREAM and msg.payload:
self.debug(f"Got Sub-devices status update")
self.callback_status_update(msg)
else:
if msg.cmd == CONTROL_NEW:
self.debug("Got ACK message for command %d: will ignore it", msg.cmd)
if msg.cmd == CONTROL_NEW or not msg.payload:
self.debug("Got ACK message for command %d: will ignore it %s", msg.cmd)
else:
self.debug(
"Got message type %d for unknown listener %d: %s",
Expand Down Expand Up @@ -769,6 +779,7 @@ def __init__(
self.on_connected = on_connected
self.heartbeater = None
self.dps_cache = {}
self.sub_devices_states = {} # {"Online": [cid,...], "offline": [cid...]}
self.local_nonce = b"0123456789abcdef" # not-so-random random key
self.remote_nonce = b""
self.dps_whitelist = UPDATE_DPS_WHITELIST
Expand Down Expand Up @@ -809,6 +820,22 @@ def _status_update(msg):
decoded_message: dict = self._decode_payload(msg.payload)
cid = None

# Handle sub-devices states update.
if msg.cmd == LAN_EXT_STREAM:
self.debug(f"Sub-Devices States Update: {decoded_message}")
if (data := decoded_message.get("data")) and isinstance(data, dict):
self.sub_devices_states.update(data)
listener = self.listener and self.listener()
if listener is None:
return

on_devices = data.get("online", [])
off_devices = data.get("offline", [])
for cid, device in listener._sub_devices.items():
if cid in off_devices or cid not in on_devices:
device.disconnected()
return

if "dps" not in decoded_message:
return

Expand Down Expand Up @@ -883,9 +910,28 @@ async def heartbeat_loop():
# Prevent duplicates heartbeat task
self.heartbeater = self.loop.create_task(heartbeat_loop())

def start_sub_devices_heartbeat(self):
"""Update the states of subdevices every 30sec. this function only be called once."""

async def heartbeat_loop():
"""Continuously send heart beat updates."""
self.debug("Start a heartbeat for sub-devices")
# This will break if main "heartbeat" stopped
while self.heartbeater:
try:
await self.subdevices_query()
await asyncio.sleep(HEARTBEAT_SUB_DEVICES_INTERVAL)
except (Exception, asyncio.CancelledError) as ex:
self.debug(f"Sub-devices heartbeat stopped due to: {ex}")
break

if self.heartbeater:
# Prevent duplicates heartbeat task
self.loop.create_task(heartbeat_loop())

def data_received(self, data):
"""Received data from device."""
# self.debug("received data=%r", binascii.hexlify(data))
# self.debug("received data=%r", binascii.hexlify(data), force=True)
self.dispatcher.add_data(data)

def connection_lost(self, exc):
Expand Down Expand Up @@ -960,7 +1006,7 @@ async def exchange_quick(self, payload, recv_retries):
)
return None

async def exchange(self, command, dps=None, nodeID=None, delay=True):
async def exchange(self, command, dps=None, nodeID=None, delay=True, payload=None):
"""Send and receive a message, returning response from device."""
if self.version >= 3.4 and self.real_local_key == self.local_key:
self.debug("3.4 or 3.5 device: negotiating a new session key")
Expand All @@ -969,7 +1015,7 @@ async def exchange(self, command, dps=None, nodeID=None, delay=True):
self.debug(
"Sending command %s (device type: %s) DPS: %s", command, self.dev_type, dps
)
payload = self._generate_payload(command, dps, nodeId=nodeID)
payload = payload or self._generate_payload(command, dps, nodeId=nodeID)
real_cmd = payload.cmd
dev_type = self.dev_type
# self.debug("Exchange: payload %r %r", payload.cmd, payload.payload)
Expand All @@ -994,7 +1040,7 @@ async def exchange(self, command, dps=None, nodeID=None, delay=True):
if real_cmd in [HEART_BEAT, CONTROL, CONTROL_NEW] and len(msg.payload) == 0:
# device may send messages with empty payload in response
# to a HEART_BEAT or CONTROL or CONTROL_NEW command: consider them an ACK
self.debug("ACK received for command %d: ignoring it", real_cmd)
self.debug(f"ACK received for command {real_cmd}: ignoring: {msg.seqno}")
return None
payload = self._decode_payload(msg.payload)

Expand Down Expand Up @@ -1075,6 +1121,15 @@ async def set_dps(self, dps, cid=None):
"""Set values for a set of datapoints."""
return await self.exchange(CONTROL, dps, nodeID=cid)

async def subdevices_query(self):
"""Request a list of sub-devices and their status."""
# Return payload: {"online": [cid1, ...], "offline": [cid2, ...]}
payload = self._generate_payload(
LAN_EXT_STREAM, rawData={"cids": []}, reqType="subdev_online_stat_query"
)

return await self.exchange(command=LAN_EXT_STREAM, payload=payload)

async def detect_available_dps(self, cid=None):
"""Return which datapoints are supported by the device."""
# type_0d devices need a sort of bruteforce querying in order to detect the
Expand Down Expand Up @@ -1182,12 +1237,14 @@ def _decode_payload(self, payload):
try:
json_payload = json.loads(payload)
except Exception as ex:
json_payload = self.error_json(ERR_JSON, payload)

if "devid not" in payload: # DeviceID Not found.
raise ValueError(f"DeviceID [{self.id}] Not found")
else:
raise DecodeError(
f"[{self.id}]: could not decrypt data: wrong local_key? (exception: {ex}, payload: {payload})"
)
# else:
# raise DecodeError(
# f"[{self.id}]: could not decrypt data: wrong local_key? (exception: {ex}, payload: {payload})"
# )
# json_payload = self.error_json(ERR_JSON, payload)

# v3.4 stuffs it into {"data":{"dps":{"1":true}}, ...}
Expand Down Expand Up @@ -1341,7 +1398,15 @@ def _encode_message(self, msg):
return buffer

def _generate_payload(
self, command, data=None, gwId=None, devId=None, uid=None, nodeId=None
self,
command,
data=None,
gwId=None,
devId=None,
uid=None,
nodeId=None,
rawData=None,
reqType=None,
):
"""
Generate the payload to send.
Expand Down Expand Up @@ -1429,8 +1494,9 @@ def deepcopy_dict(_dict: dict):
json_data["t"] = int(time.time())
else:
json_data["t"] = str(int(time.time()))

if data is not None:
if rawData is not None and "data" in json_data:
json_data["data"] = rawData
elif data is not None:
if "dpId" in json_data:
json_data["dpId"] = data
elif "data" in json_data:
Expand All @@ -1439,6 +1505,8 @@ def deepcopy_dict(_dict: dict):
json_data["dps"] = data
elif self.dev_type == "type_0d" and command == DP_QUERY:
json_data["dps"] = self.dps_to_request
if reqType and "reqType" in json_data:
json_data["reqType"] = reqType

if json_data == "":
payload = ""
Expand Down

0 comments on commit 5d8dced

Please sign in to comment.