Skip to content

Commit

Permalink
Merge pull request #165 from zabuldon/dev
Browse files Browse the repository at this point in the history
2021-03-01
  • Loading branch information
alandtse committed Mar 2, 2021
2 parents 8cfbc43 + 395d641 commit afcc0ad
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 248 deletions.
2 changes: 1 addition & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ aiohttp = "*"
backoff = "*"
beautifulsoup4 = "*"
wrapt = "*"
authcaptureproxy = "~=0.5.0"
authcaptureproxy = "~=0.6.0"

[pipenv]
allow_prereleases = true
469 changes: 245 additions & 224 deletions Pipfile.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# requirements. To emit only development requirements, pass "--dev-only".

-i https://pypi.python.org/simple
aiohttp==3.7.3
aiohttp==3.7.4
alabaster==0.7.12
appdirs==1.4.4
astroid==2.4.2; python_version >= '3.5'
Expand Down
56 changes: 40 additions & 16 deletions teslajsonpy/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
import secrets
import time
from typing import Dict, Text
from bs4 import BeautifulSoup
import yarl

import aiohttp
from bs4 import BeautifulSoup
import yarl
from yarl import URL

from teslajsonpy.const import DRIVING_INTERVAL, WEBSOCKET_TIMEOUT
from teslajsonpy.const import (
API_URL,
AUTH_DOMAIN,
DRIVING_INTERVAL,
WEBSOCKET_TIMEOUT,
WS_URL,
)
from teslajsonpy.exceptions import IncompleteCredentials, TeslaException

_LOGGER = logging.getLogger(__name__)
Expand All @@ -51,8 +57,8 @@ def __init__(
self.client_secret: Text = (
"c7257eb71a564034f9419ee651c7d0e5f7" "aa6bfbd18bafb5c5c033b093bb2fa3"
)
self.baseurl: Text = "https://owner-api.teslamotors.com"
self.websocket_url: Text = "wss://streaming.vn.teslamotors.com/streaming"
self.baseurl: Text = API_URL
self.websocket_url: Text = WS_URL
self.api: Text = "/api/1/"
self.expiration: int = expiration
self.access_token = access_token
Expand All @@ -75,6 +81,8 @@ def __init__(
self.__sethead(access_token=self.access_token, expiration=self.expiration)
_LOGGER.debug("Connecting with existing access token")
self.websocket = None
self.mfa_code: Text = ""
self.auth_domain: URL = URL(AUTH_DOMAIN)

async def get(self, command):
"""Get data from API."""
Expand All @@ -101,7 +109,7 @@ async def post(self, command, method="post", data=None):
if self.email and self.password:
_LOGGER.debug("Getting sso auth code using credentials")
self.code = await self.get_authorization_code(
self.email, self.password
self.email, self.password, mfa_code=self.mfa_code
)
else:
_LOGGER.debug("Using existing authorization code")
Expand All @@ -114,10 +122,10 @@ async def post(self, command, method="post", data=None):
refresh_token=self.sso_oauth.get("refresh_token")
)
if auth and all(
[
(
auth.get(item)
for item in ["access_token", "refresh_token", "expires_in"]
]
)
):
self.sso_oauth = {
"access_token": auth["access_token"],
Expand Down Expand Up @@ -382,8 +390,22 @@ async def get_authorization_code(
_LOGGER.debug("No email or password for login; unable to login.")
return
url = self.get_authorization_code_link(new=True)
resp = await self.websession.get(url)
resp = await self.websession.get(url.update_query({"login_hint": email}))
html = await resp.text()
if resp.history:
for item in resp.history:
if (
item.status in [301, 302, 303, 304, 305, 306, 307, 308]
and resp.url.host != self.auth_domain.host
):
_LOGGER.debug(
"Detected %s redirect from %s to %s; changing proxy host",
item.status,
item.url.host,
resp.url.host,
)
self.auth_domain = self.auth_domain.with_host(str(resp.url.host))
url = self.get_authorization_code_link()
soup: BeautifulSoup = BeautifulSoup(html, "html.parser")
data = get_inputs(soup)
data["identity"] = email
Expand All @@ -396,8 +418,9 @@ async def get_authorization_code(
if not resp.history:
html = await resp.text()
if "/mfa/verify" in html:
_LOGGER.debug("Detected MFA request")
mfa_resp = await self.websession.get(
"https://auth.tesla.com/oauth2/v3/authorize/mfa/factors",
self.auth_domain.with_path("/oauth2/v3/authorize/mfa/factors"),
params={"transaction_id": transaction_id},
)
_process_resp(mfa_resp)
Expand All @@ -416,7 +439,7 @@ async def get_authorization_code(
# ]
# }
mfa_json = await mfa_resp.json()
if len(mfa_json.get("data", [])) > 1:
if len(mfa_json.get("data", [])) >= 1:
factor_id = mfa_json["data"][mfa_device]["id"]
if not mfa_code:
_LOGGER.debug("No MFA provided")
Expand All @@ -425,7 +448,7 @@ async def get_authorization_code(
"MFA Code missing", devices=mfa_json["data"]
)
mfa_resp = await self.websession.post(
"https://auth.tesla.com/oauth2/v3/authorize/mfa/verify",
self.auth_domain.with_path("/oauth2/v3/authorize/mfa/verify"),
json={
"transaction_id": transaction_id,
"factor_id": factor_id,
Expand All @@ -445,10 +468,11 @@ async def get_authorization_code(
resp = await self.websession.post(url, data=data)
_process_resp(resp)
await asyncio.sleep(3)
if not (resp.history):
if not resp.history or not URL(resp.history[-1].url).query.get("code"):
_LOGGER.debug("Failed to authenticate")
raise IncompleteCredentials("Unable to login with credentials")
code_url = URL(resp.history[-1].url)
_LOGGER.debug("Found code %s", code_url.query.get("code"))
return code_url.query.get("code")

def get_authorization_code_link(self, new=False) -> yarl.URL:
Expand All @@ -472,7 +496,7 @@ def get_authorization_code_link(self, new=False) -> yarl.URL:
"scope": "openid email offline_access",
"state": state,
}
url = yarl.URL("https://auth.tesla.com/oauth2/v3/authorize")
url = self.auth_domain.with_path("/oauth2/v3/authorize")
url = url.update_query(query)
return url

Expand All @@ -491,7 +515,7 @@ async def get_sso_auth_token(self, code):
"redirect_uri": "https://auth.tesla.com/void/callback",
}
auth = await self.websession.post(
"https://auth.tesla.com/oauth2/v3/token",
self.auth_domain.with_path("/oauth2/v3/token"),
data=oauth,
)
return await auth.json()
Expand All @@ -510,7 +534,7 @@ async def refresh_access_token(self, refresh_token):
"scope": "openid email offline_access",
}
auth = await self.websession.post(
"https://auth.tesla.com/oauth2/v3/token",
self.auth_domain.with_path("/oauth2/v3/token"),
data=oauth,
)
return await auth.json()
Expand Down
3 changes: 3 additions & 0 deletions teslajsonpy/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@
DRIVING_INTERVAL = 60 # interval when driving detected
WEBSOCKET_TIMEOUT = 11 # time for websocket to timeout
RELEASE_NOTES_URL = "https://teslascope.com/teslapedia/software/"
AUTH_DOMAIN = "https://auth.tesla.com"
API_URL = "https://owner-api.teslamotors.com"
WS_URL = "wss://streaming.vn.teslamotors.com/streaming"
23 changes: 20 additions & 3 deletions teslajsonpy/controller.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# SPDX-License-Identifier: Apache-2.0
"""
Python Package for controlling Tesla API.
SPDX-License-Identifier: Apache-2.0
Controller to control access to the Tesla API.
For more details about this api, please refer to the documentation at
Expand All @@ -10,7 +11,7 @@
import asyncio
import logging
import time
from typing import Callable, Dict, Optional, Text
from typing import Callable, Dict, List, Optional, Text

from aiohttp import ClientConnectorError
import backoff
Expand Down Expand Up @@ -260,20 +261,27 @@ def __init__(
self.enable_websocket = enable_websocket

async def connect(
self, test_login=False, wake_if_asleep=False, filtered_vins=None
self,
test_login: bool = False,
wake_if_asleep: bool = False,
filtered_vins: Optional[List[Text]] = None,
mfa_code: Text = "",
) -> Dict[Text, Text]:
"""Connect controller to Tesla.
Args
test_login (bool, optional): Whether to test credentials only. Defaults to False.
wake_if_asleep (bool, optional): Whether to wake up any sleeping cars to update state. Defaults to False.
filtered_vins (list, optional): If not empty, filters the cars by the provided VINs.
mfa_code (Text, optional): MFA code to use for connection
Returns
Dict[Text, Text]: Returns the refresh_token, access_token, and expires_in time
"""

if mfa_code:
self.__connection.mfa_code = mfa_code
cars = await self.get_vehicles()
self._last_attempted_update_time = time.time()
self.__update_lock = asyncio.Lock()
Expand Down Expand Up @@ -363,6 +371,15 @@ def set_authorization_code(self, code: Text) -> None:
"""Set authorization code in Connection."""
self.__connection.code = code

def set_authorization_domain(self, domain: Text) -> None:
"""Set authorization domain in Connection."""
if not domain:
return
if self.__connection.auth_domain.host != domain:
self.__connection.auth_domain = self.__connection.auth_domain.with_host(
domain
)

def register_websocket_callback(self, callback) -> int:
"""Register callback for websocket messages.
Expand Down
8 changes: 5 additions & 3 deletions teslajsonpy/teslaproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ async def test_url(
if code:
username = data.get("identity")
self._callback_url = self.init_query.get("callback_url")
self.waf_retry = 0
_LOGGER.debug("Success! Oauth code %s for %s captured.", code, username)
# 302 redirect
return URL(self._callback_url).update_query(
{"code": code, "username": username}
{"code": code, "username": username, "domain": self._host_url.host}
)
if resp.content_type == "text/html":
text = await resp.text()
Expand All @@ -99,8 +100,9 @@ async def test_url(
return return_timer_countdown_refresh_html(
max(30 * (self.waf_retry - self.waf_limit), 120)
if self.waf_retry > self.waf_limit
else random.random() * self.waf_retry + 5,
else random.random() * self.waf_retry + 10,
f"Detected Tesla web application firewall block #{self.waf_retry}. Please wait and then reload the page or wait for the auto reload.",
False,
)
self.waf_retry = 0
if resp.content_type == "application/json":
Expand Down Expand Up @@ -181,7 +183,7 @@ async def modify_headers(
result = await super().modify_headers(site, request)
method = request.method
if (
str(site) == "https://auth.tesla.com/oauth2/v3/authorize/mfa/verify"
str(site.path) == "/oauth2/v3/authorize/mfa/verify"
and method == "POST"
and not await request.post()
):
Expand Down

0 comments on commit afcc0ad

Please sign in to comment.