From d3afe61fea7c216a509fd630dea85dff0728a081 Mon Sep 17 00:00:00 2001 From: "Alan D. Tse" Date: Sat, 15 Feb 2020 01:01:32 -0800 Subject: [PATCH] fix: update websocket subscribe and retry This is the subscribe command from the latest app --- teslajsonpy/connection.py | 112 +++++++++++++++++++++++++++++++++++--- teslajsonpy/const.py | 1 + teslajsonpy/controller.py | 70 +++++++++++++++--------- 3 files changed, 149 insertions(+), 34 deletions(-) diff --git a/teslajsonpy/connection.py b/teslajsonpy/connection.py index 6c1ba464..e6d1aff3 100644 --- a/teslajsonpy/connection.py +++ b/teslajsonpy/connection.py @@ -10,12 +10,14 @@ import datetime import json import logging +import time from typing import Dict, Text import aiohttp from yarl import URL from teslajsonpy.exceptions import IncompleteCredentials, TeslaException +from teslajsonpy.const import DRIVING_INTERVAL, WEBSOCKET_TIMEOUT _LOGGER = logging.getLogger(__name__) @@ -179,6 +181,8 @@ async def websocket_connect(self, vin: int, vehicle_id: int, **kwargs): async def _process_messages() -> None: """Start Async WebSocket Listener.""" + nonlocal last_message_time + nonlocal disconnected async for msg in self.websocket: _LOGGER.debug("msg: %s", msg) if msg.type == aiohttp.WSMsgType.BINARY: @@ -189,6 +193,8 @@ async def _process_messages() -> None: self.websocket_url, vin[-5:], ) + if msg_json["msg_type"] == "data:update": + last_message_time = time.time() if ( msg_json["msg_type"] == "data:error" and msg_json["value"] == "Can't validate token. " @@ -202,23 +208,111 @@ async def _process_messages() -> None: ): if kwargs.get("on_disconnect"): kwargs.get("on_disconnect")(msg_json) + disconnected = True if kwargs.get("on_message"): kwargs.get("on_message")(msg_json) elif msg.type == aiohttp.WSMsgType.ERROR: _LOGGER.debug("WSMsgType error") break + disconnected = False + last_message_time = time.time() + timeout = last_message_time + DRIVING_INTERVAL if not self.websocket or self.websocket.closed: _LOGGER.debug("Connecting to websocket %s", self.websocket_url) self.websocket = await self.websession.ws_connect(self.websocket_url) loop = asyncio.get_event_loop() loop.create_task(_process_messages()) - _LOGGER.debug("%s:Trying to subscribe to websocket", vin[-5:]) - await self.websocket.send_json( - data={ - "msg_type": "data:subscribe_oauth", - "token": self.access_token, - "value": "speed,odometer,soc,elevation,est_heading,est_lat,est_lng,power,shift_state,range,est_range,heading", - "tag": f"{vehicle_id}", - } - ) + while not ( + disconnected + or time.time() - last_message_time > WEBSOCKET_TIMEOUT + or time.time() > timeout + ): + _LOGGER.debug("%s:Trying to subscribe to websocket", vin[-5:]) + await self.websocket.send_json( + data={ + "msg_type": "data:subscribe_oauth", + "token": self.access_token, + "value": "shift_state,speed,power,est_lat,est_lng,est_heading,est_corrected_lat,est_corrected_lng,native_latitude,native_longitude,native_heading,native_type,native_location_supported", + # "value": "speed,odometer,soc,elevation,est_heading,est_lat,est_lng,power,shift_state,range,est_range,heading", + # old values + "tag": f"{vehicle_id}", + "created:timestamp": round(time.time() * 1000), + } + ) + await asyncio.sleep(WEBSOCKET_TIMEOUT - 1) + + # async def websocket_connect2(self, vin: int, vehicle_id: int, **kwargs): + # """Connect to Tesla streaming websocket. + + # Args: + # vin (int): vin of vehicle + # vehicle_id (int): vehicle_id from Tesla api + # on_message (function): function to call on a valid message. It must + # process a json delivered in data + # on_disconnect (function): function to call on a disconnect message. It must + # process a json delivered in data + + # """ + + # async def _process_messages() -> None: + # """Start Async WebSocket Listener.""" + # async for msg in self.websocket[vin]["websocket"]: + # _LOGGER.debug("%s:msg: %s", vin[-5:], msg) + # if msg.type == aiohttp.WSMsgType.BINARY: + # msg_json = json.loads(msg.data) + # if msg_json["msg_type"] == "control:hello": + # _LOGGER.debug( + # "%s:Succesfully connected to websocket %s on %s", + # vin[-5:], + # self.websocket_url, + # task, + # ) + # if ( + # msg_json["msg_type"] == "data:error" + # and msg_json["value"] == "Can't validate token. " + # ): + # raise TeslaException( + # "Can't validate token for websocket connection." + # ) + # if ( + # msg_json["msg_type"] == "data:error" + # and msg_json["value"] == "disconnected" + # ): + # if self.websocket[vin].kwargs.get("on_disconnect"): + # self.websocket[vin].kwargs.get("on_disconnect")() + # self.websocket[vin].pop(None) + # _LOGGER.debug( + # "%s:Disconnecting from websocket on %s", vin[-5:], task + # ) + # await self.websocket[vin]["websocket"].close() + # if kwargs.get("on_message"): + # kwargs.get("on_message")(msg_json) + # elif msg.type == aiohttp.WSMsgType.ERROR: + # _LOGGER.debug("WSMsgType error") + # break + + # self.websocket.setdefault(vin, {"websocket": None, "kwargs": kwargs}) + # if ( + # not self.websocket[vin]["websocket"] + # or self.websocket[vin]["websocket"].closed + # ): + # _LOGGER.debug("%s:Connecting to websocket %s", vin[-5:], self.websocket_url) + # self.websocket[vin]["websocket"] = await self.websession.ws_connect( + # self.websocket_url + # ) + # loop = asyncio.get_event_loop() + # task = loop.create_task(_process_messages()) + # _LOGGER.debug( + # "%s:Trying to subscribe to websocket: %s", vin[-5:], self.access_token + # ) + + # await self.websocket[vin]["websocket"].send_json( + # data={ + # "msg_type": "data:subscribe_oauth", + # # "token": "self.access_token", + # "token": self.access_token, + # "value": "speed,odometer,soc,elevation,est_heading,est_lat,est_lng,power,shift_state,range,est_range,heading", + # "tag": f"{vehicle_id}", + # } + # ) diff --git a/teslajsonpy/const.py b/teslajsonpy/const.py index ffd4dd9f..a755d201 100644 --- a/teslajsonpy/const.py +++ b/teslajsonpy/const.py @@ -9,3 +9,4 @@ ONLINE_INTERVAL = 60 # interval for checking online state; does not hit individual cars SLEEP_INTERVAL = 660 # interval required to let vehicle sleep; based on testing DRIVING_INTERVAL = 60 # interval when driving detected +WEBSOCKET_TIMEOUT = 11 # time for websocket to timeout diff --git a/teslajsonpy/controller.py b/teslajsonpy/controller.py index 97be77be..6b104a6e 100644 --- a/teslajsonpy/controller.py +++ b/teslajsonpy/controller.py @@ -802,20 +802,28 @@ def _process_websocket_message(self, data): update_json = {} vehicle_id = int(data["tag"]) vin = self.__vehicle_id_vin_map[vehicle_id] + # shift_state,speed,power,est_lat,est_lng,est_heading,est_corrected_lat,est_corrected_lng, + # native_latitude,native_longitude,native_heading,native_type,native_location_supported keys = [ ("timestamp", int), + ("shift_state", str), ("speed", int), - ("odometer", float), - ("soc", int), - ("elevation", int), - ("est_heading", int), + ("power", int), ("est_lat", float), ("est_lng", float), - ("power", int), - ("shift_state", str), - ("range", int), - ("est_range", int), - ("heading", int), + ("est_heading", int), + ("est_corrected_lat", float), + ("est_corrected_lng", float), + ("native_latitude", float), + ("native_longitude", float), + ("native_heading", float), + ("native_type", str), + ("native_location_supported", int), + # ("soc", int), + # ("elevation", int), + # ("range", int), + # ("est_range", int), + # ("heading", int), ] values = data["value"].split(",") try: @@ -823,20 +831,10 @@ def _process_websocket_message(self, data): update_json[keys[num][0]] = keys[num][1](value) if value else None _LOGGER.debug("Updating %s with websocket: %s", vin[-5:], update_json) self.__driving[vin]["timestamp"] = update_json["timestamp"] - self.__charging[vin]["timestamp"] = update_json["timestamp"] - self.__state[vin]["timestamp"] = update_json["timestamp"] - self.__driving[vin]["speed"] = update_json["speed"] - self.__state[vin]["odometer"] = update_json["odometer"] - self.__charging[vin]["battery_level"] = update_json["soc"] - # self.__state[vin]["odometer"] = update_json["elevation"] - # no current elevation stored - self.__driving[vin]["heading"] = update_json["est_heading"] - self.__driving[vin]["latitude"] = update_json["est_lat"] - self.__driving[vin]["longitude"] = update_json["est_lng"] - self.__driving[vin]["power"] = update_json["power"] if ( self.__driving[vin].get("shift_state") - and self.__driving[vin].get("shift_state") != update_json["shift_state"] + and self.__driving[vin].get("shift_state") + != update_json["shift_state"] and ( update_json["shift_state"] is None or update_json["shift_state"] == "P" @@ -844,12 +842,34 @@ def _process_websocket_message(self, data): ): self.__last_parked_timestamp[vin] = update_json["timestamp"] / 1000 self.__driving[vin]["shift_state"] = update_json["shift_state"] - self.__charging[vin]["battery_range"] = update_json["range"] - self.__charging[vin]["est_battery_range"] = update_json["est_range"] + self.__driving[vin]["speed"] = update_json["speed"] + self.__driving[vin]["power"] = update_json["power"] + self.__driving[vin]["latitude"] = update_json["est_corrected_lat"] + self.__driving[vin]["longitude"] = update_json["est_corrected_lng"] + self.__driving[vin]["heading"] = update_json["est_heading"] + self.__driving[vin]["native_latitude"] = update_json["native_latitude"] + self.__driving[vin]["native_longitude"] = update_json[ + "native_longitude" + ] + self.__driving[vin]["native_type"] = update_json["native_type"] + self.__driving[vin]["native_location_supported"] = update_json[ + "native_location_supported" + ] + # old values + # self.__charging[vin]["timestamp"] = update_json["timestamp"] + # self.__state[vin]["timestamp"] = update_json["timestamp"] + # self.__state[vin]["odometer"] = update_json["odometer"] + # self.__charging[vin]["battery_level"] = update_json["soc"] + # self.__state[vin]["odometer"] = update_json["elevation"] + # no current elevation stored + # self.__charging[vin]["battery_range"] = update_json["range"] + # self.__charging[vin]["est_battery_range"] = update_json["est_range"] # self.__driving[vin]["heading"] = update_json["heading"] # est_heading appears more accurate - except ValueError: - _LOGGER.debug("Websocket for %s malformed: %s", vin[-5:], values) + except ValueError as ex: + _LOGGER.debug( + "Websocket for %s malformed: %s\n%s", vin[-5:], values, ex + ) for func in self.__websocket_listeners: func(data)