Skip to content

Commit

Permalink
fix: update websocket subscribe and retry
Browse files Browse the repository at this point in the history
This is the subscribe command from the latest app
  • Loading branch information
alandtse committed Feb 15, 2020
1 parent 690af0e commit d3afe61
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 34 deletions.
112 changes: 103 additions & 9 deletions teslajsonpy/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import datetime
import json
import logging
import time
from typing import Dict, Text

import aiohttp
from yarl import URL

from teslajsonpy.exceptions import IncompleteCredentials, TeslaException
from teslajsonpy.const import DRIVING_INTERVAL, WEBSOCKET_TIMEOUT

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -179,6 +181,8 @@ async def websocket_connect(self, vin: int, vehicle_id: int, **kwargs):

async def _process_messages() -> None:
"""Start Async WebSocket Listener."""
nonlocal last_message_time
nonlocal disconnected
async for msg in self.websocket:
_LOGGER.debug("msg: %s", msg)
if msg.type == aiohttp.WSMsgType.BINARY:
Expand All @@ -189,6 +193,8 @@ async def _process_messages() -> None:
self.websocket_url,
vin[-5:],
)
if msg_json["msg_type"] == "data:update":
last_message_time = time.time()
if (
msg_json["msg_type"] == "data:error"
and msg_json["value"] == "Can't validate token. "
Expand All @@ -202,23 +208,111 @@ async def _process_messages() -> None:
):
if kwargs.get("on_disconnect"):
kwargs.get("on_disconnect")(msg_json)
disconnected = True
if kwargs.get("on_message"):
kwargs.get("on_message")(msg_json)
elif msg.type == aiohttp.WSMsgType.ERROR:
_LOGGER.debug("WSMsgType error")
break

disconnected = False
last_message_time = time.time()
timeout = last_message_time + DRIVING_INTERVAL
if not self.websocket or self.websocket.closed:
_LOGGER.debug("Connecting to websocket %s", self.websocket_url)
self.websocket = await self.websession.ws_connect(self.websocket_url)
loop = asyncio.get_event_loop()
loop.create_task(_process_messages())
_LOGGER.debug("%s:Trying to subscribe to websocket", vin[-5:])
await self.websocket.send_json(
data={
"msg_type": "data:subscribe_oauth",
"token": self.access_token,
"value": "speed,odometer,soc,elevation,est_heading,est_lat,est_lng,power,shift_state,range,est_range,heading",
"tag": f"{vehicle_id}",
}
)
while not (
disconnected
or time.time() - last_message_time > WEBSOCKET_TIMEOUT
or time.time() > timeout
):
_LOGGER.debug("%s:Trying to subscribe to websocket", vin[-5:])
await self.websocket.send_json(
data={
"msg_type": "data:subscribe_oauth",
"token": self.access_token,
"value": "shift_state,speed,power,est_lat,est_lng,est_heading,est_corrected_lat,est_corrected_lng,native_latitude,native_longitude,native_heading,native_type,native_location_supported",
# "value": "speed,odometer,soc,elevation,est_heading,est_lat,est_lng,power,shift_state,range,est_range,heading",
# old values
"tag": f"{vehicle_id}",
"created:timestamp": round(time.time() * 1000),
}
)
await asyncio.sleep(WEBSOCKET_TIMEOUT - 1)

# async def websocket_connect2(self, vin: int, vehicle_id: int, **kwargs):
# """Connect to Tesla streaming websocket.

# Args:
# vin (int): vin of vehicle
# vehicle_id (int): vehicle_id from Tesla api
# on_message (function): function to call on a valid message. It must
# process a json delivered in data
# on_disconnect (function): function to call on a disconnect message. It must
# process a json delivered in data

# """

# async def _process_messages() -> None:
# """Start Async WebSocket Listener."""
# async for msg in self.websocket[vin]["websocket"]:
# _LOGGER.debug("%s:msg: %s", vin[-5:], msg)
# if msg.type == aiohttp.WSMsgType.BINARY:
# msg_json = json.loads(msg.data)
# if msg_json["msg_type"] == "control:hello":
# _LOGGER.debug(
# "%s:Succesfully connected to websocket %s on %s",
# vin[-5:],
# self.websocket_url,
# task,
# )
# if (
# msg_json["msg_type"] == "data:error"
# and msg_json["value"] == "Can't validate token. "
# ):
# raise TeslaException(
# "Can't validate token for websocket connection."
# )
# if (
# msg_json["msg_type"] == "data:error"
# and msg_json["value"] == "disconnected"
# ):
# if self.websocket[vin].kwargs.get("on_disconnect"):
# self.websocket[vin].kwargs.get("on_disconnect")()
# self.websocket[vin].pop(None)
# _LOGGER.debug(
# "%s:Disconnecting from websocket on %s", vin[-5:], task
# )
# await self.websocket[vin]["websocket"].close()
# if kwargs.get("on_message"):
# kwargs.get("on_message")(msg_json)
# elif msg.type == aiohttp.WSMsgType.ERROR:
# _LOGGER.debug("WSMsgType error")
# break

# self.websocket.setdefault(vin, {"websocket": None, "kwargs": kwargs})
# if (
# not self.websocket[vin]["websocket"]
# or self.websocket[vin]["websocket"].closed
# ):
# _LOGGER.debug("%s:Connecting to websocket %s", vin[-5:], self.websocket_url)
# self.websocket[vin]["websocket"] = await self.websession.ws_connect(
# self.websocket_url
# )
# loop = asyncio.get_event_loop()
# task = loop.create_task(_process_messages())
# _LOGGER.debug(
# "%s:Trying to subscribe to websocket: %s", vin[-5:], self.access_token
# )

# await self.websocket[vin]["websocket"].send_json(
# data={
# "msg_type": "data:subscribe_oauth",
# # "token": "self.access_token",
# "token": self.access_token,
# "value": "speed,odometer,soc,elevation,est_heading,est_lat,est_lng,power,shift_state,range,est_range,heading",
# "tag": f"{vehicle_id}",
# }
# )
1 change: 1 addition & 0 deletions teslajsonpy/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
ONLINE_INTERVAL = 60 # interval for checking online state; does not hit individual cars
SLEEP_INTERVAL = 660 # interval required to let vehicle sleep; based on testing
DRIVING_INTERVAL = 60 # interval when driving detected
WEBSOCKET_TIMEOUT = 11 # time for websocket to timeout
70 changes: 45 additions & 25 deletions teslajsonpy/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,54 +802,74 @@ def _process_websocket_message(self, data):
update_json = {}
vehicle_id = int(data["tag"])
vin = self.__vehicle_id_vin_map[vehicle_id]
# shift_state,speed,power,est_lat,est_lng,est_heading,est_corrected_lat,est_corrected_lng,
# native_latitude,native_longitude,native_heading,native_type,native_location_supported
keys = [
("timestamp", int),
("shift_state", str),
("speed", int),
("odometer", float),
("soc", int),
("elevation", int),
("est_heading", int),
("power", int),
("est_lat", float),
("est_lng", float),
("power", int),
("shift_state", str),
("range", int),
("est_range", int),
("heading", int),
("est_heading", int),
("est_corrected_lat", float),
("est_corrected_lng", float),
("native_latitude", float),
("native_longitude", float),
("native_heading", float),
("native_type", str),
("native_location_supported", int),
# ("soc", int),
# ("elevation", int),
# ("range", int),
# ("est_range", int),
# ("heading", int),
]
values = data["value"].split(",")
try:
for num, value in enumerate(values):
update_json[keys[num][0]] = keys[num][1](value) if value else None
_LOGGER.debug("Updating %s with websocket: %s", vin[-5:], update_json)
self.__driving[vin]["timestamp"] = update_json["timestamp"]
self.__charging[vin]["timestamp"] = update_json["timestamp"]
self.__state[vin]["timestamp"] = update_json["timestamp"]
self.__driving[vin]["speed"] = update_json["speed"]
self.__state[vin]["odometer"] = update_json["odometer"]
self.__charging[vin]["battery_level"] = update_json["soc"]
# self.__state[vin]["odometer"] = update_json["elevation"]
# no current elevation stored
self.__driving[vin]["heading"] = update_json["est_heading"]
self.__driving[vin]["latitude"] = update_json["est_lat"]
self.__driving[vin]["longitude"] = update_json["est_lng"]
self.__driving[vin]["power"] = update_json["power"]
if (
self.__driving[vin].get("shift_state")
and self.__driving[vin].get("shift_state") != update_json["shift_state"]
and self.__driving[vin].get("shift_state")
!= update_json["shift_state"]
and (
update_json["shift_state"] is None
or update_json["shift_state"] == "P"
)
):
self.__last_parked_timestamp[vin] = update_json["timestamp"] / 1000
self.__driving[vin]["shift_state"] = update_json["shift_state"]
self.__charging[vin]["battery_range"] = update_json["range"]
self.__charging[vin]["est_battery_range"] = update_json["est_range"]
self.__driving[vin]["speed"] = update_json["speed"]
self.__driving[vin]["power"] = update_json["power"]
self.__driving[vin]["latitude"] = update_json["est_corrected_lat"]
self.__driving[vin]["longitude"] = update_json["est_corrected_lng"]
self.__driving[vin]["heading"] = update_json["est_heading"]
self.__driving[vin]["native_latitude"] = update_json["native_latitude"]
self.__driving[vin]["native_longitude"] = update_json[
"native_longitude"
]
self.__driving[vin]["native_type"] = update_json["native_type"]
self.__driving[vin]["native_location_supported"] = update_json[
"native_location_supported"
]
# old values
# self.__charging[vin]["timestamp"] = update_json["timestamp"]
# self.__state[vin]["timestamp"] = update_json["timestamp"]
# self.__state[vin]["odometer"] = update_json["odometer"]
# self.__charging[vin]["battery_level"] = update_json["soc"]
# self.__state[vin]["odometer"] = update_json["elevation"]
# no current elevation stored
# self.__charging[vin]["battery_range"] = update_json["range"]
# self.__charging[vin]["est_battery_range"] = update_json["est_range"]
# self.__driving[vin]["heading"] = update_json["heading"]
# est_heading appears more accurate
except ValueError:
_LOGGER.debug("Websocket for %s malformed: %s", vin[-5:], values)
except ValueError as ex:
_LOGGER.debug(
"Websocket for %s malformed: %s\n%s", vin[-5:], values, ex
)
for func in self.__websocket_listeners:
func(data)

Expand Down

0 comments on commit d3afe61

Please sign in to comment.