Skip to content

Commit

Permalink
fix: use oauth3 login
Browse files Browse the repository at this point in the history
  • Loading branch information
alandtse committed Jan 31, 2021
1 parent f193c64 commit 487c7b4
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 4 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ black = "*"
[packages]
aiohttp = "*"
backoff = "*"
bs4 = "*"
wrapt = "*"

[pipenv]
Expand Down
172 changes: 168 additions & 4 deletions teslajsonpy/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
https://github.com/zabuldon/teslajsonpy
"""
import asyncio
import base64
import calendar
import datetime
import hashlib
import json
import logging
import secrets
import time
from typing import Dict, Text
from bs4 import BeautifulSoup
import yarl

import aiohttp
from yarl import URL
Expand All @@ -32,6 +37,7 @@ def __init__(
password: Text = None,
access_token: Text = None,
refresh_token: Text = None,
authorization_token=None,
expiration: int = 0,
) -> None:
"""Initialize connection object."""
Expand All @@ -52,21 +58,28 @@ def __init__(
self.refresh_token = refresh_token
self.websession = websession
self.token_refreshed = False
self.generate_oauth(email, password, refresh_token)
self.code_verifier: Text = secrets.token_urlsafe(64)
self.sso_oauth: Dict[Text, Text] = {}
self.generate_oauth(email, password, refresh_token, authorization_token)
if self.access_token:
self.__sethead(access_token=self.access_token, expiration=self.expiration)
_LOGGER.debug("Connecting with existing access token")
self.websocket = None

def generate_oauth(
self, email: Text = None, password: Text = None, refresh_token: Text = None
self,
email: Text = None,
password: Text = None,
refresh_token: Text = None,
authorization_token=None,
) -> None:
"""Generate oauth header.
Args
email (Text, optional): Tesla account email address. Defaults to None.
password (Text, optional): Password for account. Defaults to None.
refresh_token (Text, optional): Refresh token. Defaults to None.
authorization_token (Text, optional): Authorization token. Defaults to None.
Raises
IncompleteCredentials
Expand All @@ -77,7 +90,13 @@ def generate_oauth(
"""
refresh_token = refresh_token or self.refresh_token
self.oauth = {"client_id": self.client_id, "client_secret": self.client_secret}
if email and password:
if authorization_token:
self.oauth = {"client_id": "ownerapi"}
self.oauth["grant_type"] = "authorization_code"
self.oauth["code"] = authorization_token
self.oauth["code_verifier"] = self.code_verifier
self.oauth["redirect_uri"] = "https://auth.tesla.com/void/callback"
elif email and password:
self.oauth["grant_type"] = "password"
self.oauth["email"] = email
self.oauth["password"] = password
Expand All @@ -104,7 +123,31 @@ async def post(self, command, method="post", data=None):
_LOGGER.debug(
"Requesting new oauth token using %s", self.oauth["grant_type"]
)
auth = await self.__open("/oauth/token", "post", data=self.oauth)
if not self.sso_oauth or (
now > self.sso_oauth.get("expires_in", 0)
and not self.sso_oauth.get("refresh_token")
):
await self.get_authorization_code()
auth = await self.get_sso_auth_token()
self.sso_oauth = {
"access_token": auth["access_token"],
"refresh_token": auth["refresh_token"],
"expires_in": auth["expires_in"] + now,
}
elif self.sso_oauth.get("refresh_token") and now > self.sso_oauth.get(
"expires_in", 0
):
auth = await self.refresh_access_token(
refresh_token=self.sso_oauth.get("refresh_token")
)
self.sso_oauth = {
"access_token": auth["access_token"],
"refresh_token": auth["refresh_token"],
"expires_in": auth["expires_in"] + now,
}
auth = await self.get_bearer_token(
access_token=self.sso_oauth.get("access_token")
)
self.__sethead(
access_token=auth["access_token"], expires_in=auth["expires_in"]
)
Expand Down Expand Up @@ -329,3 +372,124 @@ async def _process_messages() -> None:
# "tag": f"{vehicle_id}",
# }
# )

async def get_authorization_code(self) -> Text:
"""Get authorization code from the oauth3 login method."""
# https://tesla-api.timdorr.com/api-basics/authentication#step-2-obtain-an-authorization-code
code_challenge = str(
base64.urlsafe_b64encode(
hashlib.sha256(self.code_verifier.encode()).hexdigest().encode()
),
"utf-8",
)
state = secrets.token_urlsafe(64)
query = {
"client_id": "ownerapi",
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"redirect_uri": "https://auth.tesla.com/void/callback",
"response_type": "code",
"scope": "openid email offline_access",
"state": state,
}
url = yarl.URL("https://auth.tesla.com/oauth2/v3/authorize")
url = url.update_query(query)
resp = await self.websession.get(url)
html = await resp.text()
soup: BeautifulSoup = BeautifulSoup(html, "html.parser")
data = get_inputs(soup)
data["identity"] = self.oauth["email"]
data["credential"] = self.oauth["password"]
resp = await self.websession.post(url, data=data)
_process_resp(resp)
code_url = URL(resp.history[-1].url)
self.generate_oauth(authorization_token=code_url.query.get("code"))
return code_url.query.get("code")

async def get_sso_auth_token(self):
"""Get sso auth token."""
# https://tesla-api.timdorr.com/api-basics/authentication#step-2-obtain-an-authorization-code
_LOGGER.debug("Requesting new oauth token using %s", self.oauth["grant_type"])
if self.oauth["grant_type"] == "authorization_code":
auth = await self.__open(
"/oauth2/v3/token",
"post",
data=self.oauth,
baseurl="https://auth.tesla.com",
)
else:
auth = await self.__open("/oauth/token", "post", data=self.oauth)
return auth

async def refresh_access_token(self, refresh_token):
"""Refresh access token from sso."""
# https://tesla-api.timdorr.com/api-basics/authentication#refreshing-an-access-token
if not refresh_token:
_LOGGER.debug("Missing refresh token")
return
_LOGGER.debug("Refreshing access token with refresh_token")
oauth = {
"client_id": "ownerapi",
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"scope": "openid email offline_access",
}
auth = await self.__open(
"/oauth2/v3/token", "post", data=oauth, baseurl="https://auth.tesla.com",
)
return auth

async def get_bearer_token(self, access_token):
"""Get bearer token. This is used by the owners API."""
# https://tesla-api.timdorr.com/api-basics/authentication#step-4-exchange-bearer-token-for-access-token
if not access_token:
_LOGGER.debug("Missing access token")
return
_LOGGER.debug("Exchanging bearer token with access token:")
oauth = {
"client_id": self.client_id,
"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
}
head = {
"Authorization": f"Bearer {access_token}",
}
auth = await self.__open("/oauth/token", "post", headers=head, data=oauth)
return auth


def get_inputs(soup: BeautifulSoup, searchfield=None) -> Dict[str, str]:
"""Parse soup for form with searchfield."""
searchfield = searchfield or {"id": "form"}
data = {}
form = soup.find("form", searchfield)
if not form:
form = soup.find("form")
for field in form.find_all("input"):
try:
data[field["name"]] = ""
if field["type"] and field["type"] == "hidden":
data[field["name"]] = field["value"]
except BaseException: # pylint: disable=broad-except
pass
return data


def _process_resp(resp) -> Text:
if resp.history:
for item in resp.history:
_LOGGER.debug("%s: redirected from\n%s", item.method, item.url)
url = str(resp.request_info.url)
method = resp.request_info.method
status = resp.status
reason = resp.reason
headers = resp.request_info.headers
_LOGGER.debug(
"%s: \n%s with\n%s\n returned %s:%s with response %s",
method,
url,
headers,
status,
reason,
resp.headers,
)
return url

0 comments on commit 487c7b4

Please sign in to comment.